Repository: incubator-hawq
Updated Branches:
refs/heads/master dada9ba99 -> decbe0deb
HAWQ-966. Adjust libyarn output log messages.
Change some log message level to DEBUG1.
Property 'yarn.client.log.severity' in yarn-client.xml can
be used to change the output log level, default value is
'INFO',
others can be 'DEBUG1', 'WARNING', etc.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/decbe0de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/decbe0de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/decbe0de
Branch: refs/heads/master
Commit: decbe0debdd024f759b06f83fb181a67d934c75e
Parents: dada9ba
Author: Wen Lin <[email protected]>
Authored: Mon Aug 1 10:22:25 2016 +0800
Committer: Wen Lin <[email protected]>
Committed: Mon Aug 1 10:22:25 2016 +0800
----------------------------------------------------------------------
.../src/libyarnclient/ApplicationClient.cpp | 13 +-
.../src/libyarnclient/ApplicationMaster.cpp | 13 +-
.../src/libyarnclient/ContainerManagement.cpp | 8 +-
.../libyarn/src/libyarnclient/LibYarnClient.cpp | 965 ++++++++++---------
4 files changed, 506 insertions(+), 493 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/decbe0de/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
index 64acbe9..819514f 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
+++ b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp
@@ -50,7 +50,7 @@ std::vector<RMInfo> RMInfo::getHARMInfo(const Yarn::Config &
conf, const char* n
retval[i].setPort(rm[1]);
}
} catch (const Yarn::YarnConfigNotFound &e) {
- LOG(INFO, "Yarn RM HA is not configured.");
+ LOG(DEBUG1, "Yarn RM HA is not configured.");
}
return retval;
@@ -63,6 +63,7 @@ ApplicationClient::ApplicationClient(string &user, string
&host, string &port) {
Yarn::Internal::shared_ptr<Yarn::Config> conf =
DefaultConfig().getConfig();
Yarn::Internal::SessionConfig sessionConfig(*conf);
+ RootLogger.setLogSeverity(sessionConfig.getLogSeverity());
LOG(INFO, "ApplicationClient session auth method : %s",
sessionConfig.getRpcAuthMethod().c_str());
@@ -135,7 +136,7 @@ std::shared_ptr<ApplicationClientProtocol>
ApplicationClient::getActiveAppClientProto(uint32_t & oldValue) {
lock_guard<mutex> lock(this->mut);
- LOG(INFO, "ApplicationClient::getActiveAppClientProto is called.");
+ LOG(DEBUG2, "ApplicationClient::getActiveAppClientProto is called.");
if (appClientProtos.empty()) {
LOG(WARNING, "The vector of ApplicationClientProtocol is empty.");
@@ -143,7 +144,8 @@ std::shared_ptr<ApplicationClientProtocol>
}
oldValue = currentAppClientProto;
- LOG(INFO, "ApplicationClient::getActiveAppClientProto, current is %d.",
currentAppClientProto);
+ LOG(DEBUG1, "ApplicationClient::getActiveAppClientProto, current is %d.",
+ currentAppClientProto);
return appClientProtos[currentAppClientProto % appClientProtos.size()];
}
@@ -156,12 +158,13 @@ void
ApplicationClient::failoverToNextAppClientProto(uint32_t oldValue){
++currentAppClientProto;
currentAppClientProto = currentAppClientProto % appClientProtos.size();
- LOG(INFO, "ApplicationClient::failoverToNextAppClientProto, current is
%d.", currentAppClientProto);
+ LOG(INFO, "ApplicationClient::failoverToNextAppClientProto, current is
%d.",
+ currentAppClientProto);
}
static void HandleYarnFailoverException(const Yarn::YarnFailoverException & e)
{
try {
- Yarn::rethrow_if_nested(e);
+ Yarn::rethrow_if_nested(e);
} catch (...) {
NESTED_THROW(Yarn::YarnRpcException, "%s", e.what());
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/decbe0de/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
index 08d8cdb..964ac0e 100644
--- a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
+++ b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp
@@ -86,8 +86,9 @@ ApplicationMaster::ApplicationMaster(string &schedHost,
string &schedPort,
std::shared_ptr<ApplicationMasterProtocol>(
new ApplicationMasterProtocol(rmInfos[i].getHost(),
rmInfos[i].getPort(), tokenService, sessionConfig,
rpcAuth)));
- LOG(INFO, "ApplicationMaster finds a candidate RM scheduler,
host:%s, port:%s",
- rmInfos[i].getHost().c_str(),
rmInfos[i].getPort().c_str());
+ LOG(INFO,
+ "ApplicationMaster finds a candidate RM scheduler,
host:%s, port:%s",
+ rmInfos[i].getHost().c_str(),
rmInfos[i].getPort().c_str());
}
}
currentAppMasterProto = 0;
@@ -111,7 +112,8 @@ std::shared_ptr<ApplicationMasterProtocol>
}
oldValue = currentAppMasterProto;
- LOG(INFO, "ApplicationMaster::getActiveAppMasterProto, current is %d.",
currentAppMasterProto);
+ LOG(DEBUG2, "ApplicationMaster::getActiveAppMasterProto, current is %d.",
+ currentAppMasterProto);
return appMasterProtos[currentAppMasterProto % appMasterProtos.size()];
}
@@ -124,12 +126,13 @@ void
ApplicationMaster::failoverToNextAppMasterProto(uint32_t oldValue){
++currentAppMasterProto;
currentAppMasterProto = currentAppMasterProto % appMasterProtos.size();
- LOG(INFO, "ApplicationMaster::failoverToNextAppMasterProto, current is
%d.", currentAppMasterProto);
+ LOG(INFO, "ApplicationMaster::failoverToNextAppMasterProto, current is
%d.",
+ currentAppMasterProto);
}
static void HandleYarnFailoverException(const Yarn::YarnFailoverException & e)
{
try {
- Yarn::rethrow_if_nested(e);
+ Yarn::rethrow_if_nested(e);
} catch (...) {
NESTED_THROW(Yarn::YarnRpcException, "%s", e.what());
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/decbe0de/depends/libyarn/src/libyarnclient/ContainerManagement.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/ContainerManagement.cpp
b/depends/libyarn/src/libyarnclient/ContainerManagement.cpp
index 915f518..b999988 100644
--- a/depends/libyarn/src/libyarnclient/ContainerManagement.cpp
+++ b/depends/libyarn/src/libyarnclient/ContainerManagement.cpp
@@ -67,7 +67,7 @@ StartContainerResponse
ContainerManagement::startContainer(Container &container,
oss << container.getNodeId().getPort();
string port(oss.str());
- LOG(INFO,
+ LOG(DEBUG1,
"ContainerManagement::startContainer, is going to
connect to NM [%s:%s] to start container",
host.c_str(), port.c_str());
@@ -101,7 +101,7 @@ StartContainerResponse
ContainerManagement::startContainer(Container &container,
StartContainerResponse scResponse;
scResponse.setServicesMetaData(scsResponse.getServicesMetaData());
- LOG(INFO,
+ LOG(DEBUG1,
"ContainerManagement::startContainer, after start a
container, id:%ld on NM [%s:%s]",
container.getId().getId(), host.c_str(), port.c_str());
@@ -131,7 +131,7 @@ void ContainerManagement::stopContainer(Container
&container, Token &nmToken) {
oss << container.getNodeId().getPort();
string port(oss.str());
- LOG(INFO,
+ LOG(DEBUG1,
"ContainerManagement::stopContainer, is going to
connect to NM [%s:%s] to stop container",
host.c_str(), port.c_str());
@@ -174,7 +174,7 @@ ContainerStatus
ContainerManagement::getContainerStatus(Container &container,
oss << container.getNodeId().getPort();
string port(oss.str());
- LOG(INFO,
+ LOG(DEBUG1,
"ContainerManagement, is going to connect to NM [%s:%s]
to getContainerStatus container",
host.c_str(), port.c_str());
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/decbe0de/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
----------------------------------------------------------------------
diff --git a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
index 823d116..5fa91e7 100644
--- a/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
+++ b/depends/libyarn/src/libyarnclient/LibYarnClient.cpp
@@ -37,132 +37,130 @@ using namespace Yarn::Internal;
namespace libyarn {
LibYarnClient::LibYarnClient(string &user, string &rmHost, string &rmPort,
- string &schedHost, string &schedPort, string &amHost,
- int32_t amPort, string &am_tracking_url,int heartbeatInterval) :
- amUser(user), schedHost(schedHost), schedPort(schedPort),
amHost(amHost),
- amPort(amPort), am_tracking_url(am_tracking_url),
-
heartbeatInterval(heartbeatInterval),response_id(0),clientJobId(""),
- keepRun(false), needHeartbeatAlive(false){
- pthread_mutex_init( &(heartbeatLock), NULL );
-
- amrmClient = NULL;
- appClient = (void*) new ApplicationClient(user, rmHost, rmPort);
- nmClient = (void*) new ContainerManagement();
+ string &schedHost, string &schedPort, string &amHost, int32_t amPort,
+ string &am_tracking_url, int heartbeatInterval) :
+ amUser(user), schedHost(schedHost), schedPort(schedPort), amHost(
+ amHost), amPort(amPort), am_tracking_url(am_tracking_url),
heartbeatInterval(
+ heartbeatInterval), response_id(0), clientJobId(""), keepRun(
+ false), needHeartbeatAlive(false) {
+ pthread_mutex_init(&(heartbeatLock), NULL);
+ amrmClient = NULL;
+ appClient = (void*) new ApplicationClient(user, rmHost, rmPort);
+ nmClient = (void*) new ContainerManagement();
}
#ifdef MOCKTEST
LibYarnClient::LibYarnClient(string &user,string &rmHost, string &rmPort,
string &schedHost,
- string &schedPort, string &amHost, int32_t amPort,
- string &am_tracking_url, int
heartbeatInterval,Mock::TestLibYarnClientStub *stub):
- amUser(user),schedHost(schedHost), schedPort(schedPort),
amHost(amHost),
- amPort(amPort), am_tracking_url(am_tracking_url),
- heartbeatInterval(heartbeatInterval),clientJobId(""),
- keepRun(false), needHeartbeatAlive(false){
- pthread_mutex_init( &(heartbeatLock), NULL );
- libyarnStub = stub;
- appClient = (void*) libyarnStub->getApplicationClient();
- amrmClient = (void*) libyarnStub->getApplicationMaster();
- nmClient = (void*) libyarnStub->getContainerManagement();
+ string &schedPort, string &amHost, int32_t amPort,
+ string &am_tracking_url, int
heartbeatInterval,Mock::TestLibYarnClientStub *stub):
+ amUser(user),schedHost(schedHost), schedPort(schedPort),
amHost(amHost),
+ amPort(amPort), am_tracking_url(am_tracking_url),
+ heartbeatInterval(heartbeatInterval),clientJobId(""),
+ keepRun(false), needHeartbeatAlive(false) {
+ pthread_mutex_init( &(heartbeatLock), NULL );
+ libyarnStub = stub;
+ appClient = (void*) libyarnStub->getApplicationClient();
+ amrmClient = (void*) libyarnStub->getApplicationMaster();
+ nmClient = (void*) libyarnStub->getContainerManagement();
}
#endif
LibYarnClient::~LibYarnClient() {
#ifndef MOCKTEST
- if ( keepRun ) {
- // No need to run heart-beat thread now.
- keepRun = false;
- void *thrc = NULL;
- int rc = pthread_join(heartbeatThread, &thrc);
- if ( rc != 0 ) {
- LOG(INFO, "LibYarnClient::~LibYarnClient, fail to join
heart-beat thread. "
- "error code %d", rc);
- }
- else {
- LOG(INFO, "LibYarnClient::~LibYarnClient, join
heart-beat thread successfully.");
- }
- }
+ if (keepRun) {
+ // No need to run heart-beat thread now.
+ keepRun = false;
+ void *thrc = NULL;
+ int rc = pthread_join(heartbeatThread, &thrc);
+ if (rc != 0) {
+ LOG(DEBUG1, "LibYarnClient::~LibYarnClient, fail to join
heart-beat thread. "
+ "error code %d", rc);
+ } else {
+ LOG(DEBUG1, "LibYarnClient::~LibYarnClient, join heart-beat thread
successfully.");
+ }
+ }
#endif
- if (amrmClient != NULL){
- delete (ApplicationMaster*)amrmClient;
- }
- delete (ApplicationClient*)appClient;
- delete (ContainerManagement*)nmClient;
+ if (amrmClient != NULL) {
+ delete (ApplicationMaster*) amrmClient;
+ }
+ delete (ApplicationClient*) appClient;
+ delete (ContainerManagement*) nmClient;
}
string LibYarnClient::getErrorMessage() {
return errorMessage;
}
-void LibYarnClient::setErrorMessage(string errorMsg){
+void LibYarnClient::setErrorMessage(string errorMsg) {
errorMessage = errorMsg;
}
bool LibYarnClient::isJobHealthy() {
- return keepRun;
+ return keepRun;
}
list<ResourceRequest>& LibYarnClient::getAskRequests() {
- return askRequests;
+ return askRequests;
}
void LibYarnClient::clearAskRequests() {
- LOG(INFO, "LibYarnClient::clear ask requests.");
- askRequests.clear();
+ LOG(DEBUG1, "LibYarnClient::clear ask requests.");
+ askRequests.clear();
}
void* heartbeatFunc(void* args) {
- int failcounter = 0;
- int retry = 2;
- LibYarnClient *client = (LibYarnClient*)args;
-
- while (client->keepRun) {
- try {
- client->dummyAllocate();
- failcounter = 0;
- } catch (const ApplicationMasterNotRegisteredException &e) {
- /*
- * In case catch this exception,
- * heartbeat thread should exits, and re-register AM.
- */
- LOG(WARNING, "LibYarnClient::heartbeatFunc, dummy
allocation "
- "catch
ApplicationMasterNotRegisteredException. %s",
- e.msg());
- client->keepRun = false;
- break;
- } catch (const YarnException &e) {
- LOG(WARNING, "LibYarnClient::heartbeatFunc, dummy
allocation "
- "is not correctly executed
with exception raised. %s",
- e.msg());
- failcounter++;
- if ( failcounter > retry ) {
- /* In case retry too many times with
errors/exceptions, this
- * thread will return. LibYarn has to
re-register application
- * and start the heartbeat thread again.
- */
- LOG(WARNING, "LibYarnClient::heartbeatFunc,
there are too many "
- "failures raised. This
heart-beat thread exits now.");
- client->keepRun = false;
- break;
- }
- }
- usleep((client->heartbeatInterval) * 1000);
- }
-
- LOG(INFO, "LibYarnClient::heartbeatFunc, goes into exit phase.");
- return (void *)0;
+ int failcounter = 0;
+ int retry = 2;
+ LibYarnClient *client = (LibYarnClient*) args;
+
+ while (client->keepRun) {
+ try {
+ client->dummyAllocate();
+ failcounter = 0;
+ } catch (const ApplicationMasterNotRegisteredException &e) {
+ /*
+ * In case catch this exception,
+ * heartbeat thread should exits, and re-register AM.
+ */
+ LOG(WARNING, "LibYarnClient::heartbeat dummy allocation "
+ "catch ApplicationMasterNotRegisteredException. %s",
+ e.msg());
+ client->keepRun = false;
+ break;
+ } catch (const YarnException &e) {
+ LOG(WARNING, "LibYarnClient::heartbeat dummy allocation "
+ "is not correctly executed with exception raised. %s",
+ e.msg());
+ failcounter++;
+ if (failcounter > retry) {
+ /* In case retry too many times with errors/exceptions, this
+ * thread will return. LibYarn has to re-register application
+ * and start the heartbeat thread again.
+ */
+ LOG(WARNING, "LibYarnClient::heartbeatFunc, there are too many
"
+ "failures raised. This heart-beat thread exits
now.");
+ client->keepRun = false;
+ break;
+ }
+ }
+ usleep((client->heartbeatInterval) * 1000);
+ }
+
+ LOG(INFO, "LibYarnClient::heartbeatFunc, goes into exit phase.");
+ return (void *) 0;
}
-int LibYarnClient::createJob(string &jobName, string &queue,string &jobId) {
- try{
- //Only one jobId for the client right now.
- if (clientJobId != ""){
- throw std::invalid_argument( "Exist an application for the
client");
- }
- ApplicationClient *applicationClient = (ApplicationClient*)appClient;
+int LibYarnClient::createJob(string &jobName, string &queue, string &jobId) {
+ try {
+ //Only one jobId for the client right now.
+ if (clientJobId != ""){
+ throw std::invalid_argument( "Exist an application for the
client");
+ }
+ ApplicationClient *applicationClient = (ApplicationClient*) appClient;
//1. getNewApplication
ApplicationId appId = applicationClient->getNewApplication();
- LOG(INFO, "LibYarnClient::createJob, getNewApplication finished,
appId:[clusterTimeStamp:%lld,id:%d]",
- appId.getClusterTimestamp(), appId.getId());
+ LOG(DEBUG1, "LibYarnClient::createJob, getNewApplication finished,
appId:[clusterTimeStamp:%lld,id:%d]",
+ appId.getClusterTimestamp(), appId.getId());
//2. submitApplication
ApplicationSubmissionContext appSubmitCtx;
@@ -180,16 +178,17 @@ int LibYarnClient::createJob(string &jobName, string
&queue,string &jobId) {
appSubmitCtx.setMaxAppAttempts(1);
applicationClient->submitApplication(appSubmitCtx);
- LOG(INFO, "LibYarnClient::createJob, submitApplication finished");
+ LOG(DEBUG1, "LibYarnClient::createJob, submitApplication finished");
//3. wait util AM is ACCEPTED and return the AMRMToken
ApplicationReport report;
int retry = 10;
while (retry > 0) {
report = applicationClient->getApplicationReport(appId);
- LOG(INFO,"LibYarnClient::createJob,
appId[cluster_timestamp:%lld,id:%d], appState:%d",
- appId.getClusterTimestamp(), appId.getId(),
report.getYarnApplicationState());
- if ((report.getAMRMToken().getPassword() != "") &&
report.getYarnApplicationState() == YarnApplicationState::ACCEPTED) {
+ LOG(DEBUG1, "LibYarnClient::createJob,
appId[cluster_timestamp:%lld,id:%d], appState:%d",
+ appId.getClusterTimestamp(), appId.getId(),
report.getYarnApplicationState());
+ if ((report.getAMRMToken().getPassword() != "") &&
+ report.getYarnApplicationState() ==
YarnApplicationState::ACCEPTED) {
break;
} else {
retry--;
@@ -205,20 +204,21 @@ int LibYarnClient::createJob(string &jobName, string
&queue,string &jobId) {
//4.1 new ApplicationMaster
Token token = report.getAMRMToken();
UserInfo user;
- if (applicationClient->getMethod() == SIMPLE)
- user = UserInfo::LocalUser();
- else if (applicationClient->getMethod() == KERBEROS) {
-
user.setEffectiveUser(applicationClient->getPrincipal());
- user.setRealUser(applicationClient->getUser());
- } else {
- LOG(WARNING, "LibYarnClient::createJob: unsupported RPC
method:%d. ", applicationClient->getMethod());
- }
-
- Yarn::Token AMToken;
- AMToken.setIdentifier(token.getIdentifier());
- AMToken.setKind(token.getKind());
- AMToken.setPassword(token.getPassword());
- AMToken.setService(token.getService());
+ if (applicationClient->getMethod() == SIMPLE) {
+ user = UserInfo::LocalUser();
+ } else if (applicationClient->getMethod() == KERBEROS) {
+ user.setEffectiveUser(applicationClient->getPrincipal());
+ user.setRealUser(applicationClient->getUser());
+ } else {
+ LOG(WARNING, "LibYarnClient::createJob: unsupported RPC method:%d.
",
+ applicationClient->getMethod());
+ }
+
+ Yarn::Token AMToken;
+ AMToken.setIdentifier(token.getIdentifier());
+ AMToken.setKind(token.getKind());
+ AMToken.setPassword(token.getPassword());
+ AMToken.setService(token.getService());
user.addToken(AMToken);
#ifndef MOCKTEST
@@ -227,60 +227,60 @@ int LibYarnClient::createJob(string &jobName, string
&queue,string &jobId) {
#endif
//4.2 register to RM scheduler as AM
- ((ApplicationMaster*) amrmClient)->registerApplicationMaster(amHost,
amPort,am_tracking_url);
- LOG(INFO, "LibYarnClient::createJob, registerApplicationMaster
finished");
+ ((ApplicationMaster*) amrmClient)->registerApplicationMaster(amHost,
amPort, am_tracking_url);
+ LOG(DEBUG1, "LibYarnClient::createJob, registerApplicationMaster
finished");
#ifndef MOCKTEST
keepRun = true;
//5. setup the heartbeat thread to allocate, release, heartbeat
int rc = pthread_create(&heartbeatThread, NULL, heartbeatFunc, this);
- if ( rc != 0 ) {
+ if (rc != 0) {
keepRun = false;
- LOG(INFO, "LibYarnClient::createJob, fail to create heart-beat
thread. "
- "error code %d", rc);
- throw std::runtime_error( "Fail to create heart-beat thread.");
+ LOG(WARNING, "LibYarnClient::createJob, fail to create heart-beat
thread. "
+ "error code %d", rc);
+ throw std::runtime_error("Fail to create heart-beat thread.");
}
needHeartbeatAlive = true;
#endif
- LOG(INFO,"LibYarnClient::createJob, after AM register to RM, a
heartbeat thread has been started");
+ LOG(DEBUG1, "LibYarnClient::createJob, after AM register to RM, a
heartbeat thread has been started");
//6. return jobId
stringstream ss;
ss << "job_" << appId.getClusterTimestamp() << "_" << appId.getId();
jobId = ss.str();
- LOG(INFO,"LibYarnClient::createJob,
appId[cluster_timestamp:%lld,id:%d]",
- clientAppId.getClusterTimestamp(),
clientAppId.getId());
+ LOG(INFO, "LibYarnClient::createJob,
appId[cluster_timestamp:%lld,id:%d]",
+ clientAppId.getClusterTimestamp(), clientAppId.getId());
clientJobId = jobId;
return FR_SUCCEEDED;
} catch (const YarnNetworkConnectException &e) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::createJob, catch network connection
exception:" << e.what();
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
- } catch (const std::exception &e) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::createJob, catch network connection
exception:" << e.what();
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
+ } catch (const std::exception &e) {
stringstream errorMsg;
errorMsg << "LibYarnClient::createJob, catch exception:" << e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::createJob, catch unexpected
exception.";
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::createJob, catch unexpected exception.";
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
}
}
int LibYarnClient::forceKillJob(string &jobId) {
#ifndef MOCKTEST
- if ( keepRun ) {
+ if (keepRun) {
keepRun = false;
void *thrc = NULL;
int rc = pthread_join(heartbeatThread, &thrc);
- if ( rc != 0 ) {
- LOG(INFO, "LibYarnClient::forceKillJob, fail to join heart-beat
thread. "
- "error code %d", rc);
+ if (rc != 0) {
+ LOG(WARNING, "LibYarnClient::forceKillJob, fail to join heart-beat
thread. "
+ "error code %d", rc);
return FR_FAILED;
} else {
LOG(INFO, "LibYarnClient::forceKillJob, join heart-beat thread
successfully.");
@@ -294,27 +294,30 @@ int LibYarnClient::forceKillJob(string &jobId) {
}
needHeartbeatAlive = false;
- for (map<int64_t,Container*>::iterator it = jobIdContainers.begin();
it != jobIdContainers. end(); it++) {
+ for (map<int64_t, Container*>::iterator it = jobIdContainers.begin();
+ it != jobIdContainers.end(); it++) {
ostringstream key;
Container *container = it->second;
key << container->getNodeId().getHost() << ":" <<
container->getNodeId().getPort();
Token nmToken = nmTokenCache[key.str()];
- ((ContainerManagement*)nmClient)->stopContainer((*container),
nmToken);
- LOG(INFO,"LibYarnClient::forceKillJob, container:%ld is
stopped",container->getId().getId());
+ ((ContainerManagement*) nmClient)->stopContainer((*container),
nmToken);
+ LOG(DEBUG1, "LibYarnClient::forceKillJob, container:%ld is
stopped",container->getId().getId());
}
((ApplicationClient*) appClient)->forceKillApplication(clientAppId);
- LOG(INFO, "LibYarnClient::forceKillJob, forceKillApplication");
+ LOG(INFO, "LibYarnClient::force to kill this application.");
- for (map<int64_t,Container*>::iterator it = jobIdContainers.begin();
it != jobIdContainers.end(); it++) {
- LOG(INFO,"LibYarnClient::forceKillJob, container:%ld in
jobIdContainers is deleted",it->second->getId().getId());
+ for (map<int64_t,Container*>::iterator it = jobIdContainers.begin();
+ it != jobIdContainers.end(); it++) {
+ LOG(DEBUG1, "LibYarnClient::forceKillJob, container:%ld in
jobIdContainers is deleted",
+ it->second->getId().getId());
delete it->second;
it->second = NULL;
}
jobIdContainers.clear();
activeFailContainerIds.clear();
return FR_SUCCEEDED;
- } catch(std::exception& e){
+ } catch (std::exception& e) {
stringstream errorMsg;
errorMsg << "LibYarnClient::forceKillJob, catch the exception:" <<
e.what();
setErrorMessage(errorMsg.str());
@@ -331,82 +334,77 @@ int LibYarnClient::forceKillJob(string &jobId) {
void LibYarnClient::dummyAllocate() {
pthread_mutex_lock(&heartbeatLock);
- ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
-
- //1) requestProto_blank
- list<ResourceRequest> asksBlank;
- //2) releasesBlank
- list<ContainerId> releasesBlank;
- //3) blacklistRequestBlank
- ResourceBlacklistRequest blacklistRequestBlank;
- //4) progress
- float progress = 0.5;
- int allocatedNum = 0;
-
- try {
- LOG(INFO, "LibYarnClient::dummyAllocate, do a AM-RM heartbeat
with response_id:%d", response_id);
- AllocateResponse response = amrmClientAlias->allocate(asksBlank,
-
releasesBlank,
-
blacklistRequestBlank,
-
response_id,
-
progress);
- response_id = response.getResponseId();
- list<Container> allocatedContainers =
response.getAllocatedContainers();
- allocatedNum = allocatedContainers.size();
- LOG(INFO,"LibYarnClient::dummyAllocate returned response_id
:%d", response_id);
- if (allocatedNum > 0) {
- /*
- * In rare case, client gets allocated containers in
heartbeat,
- * free them immediately.
- */
- LOG(INFO, "LibYarnClient::dummyAllocate returned
allocated size: %d, "
- "free them immediately.",
allocatedNum);
- list<ContainerId> releases;
- for (list<Container>::iterator it =
allocatedContainers.begin();
- it != allocatedContainers.end(); it++) {
- releases.push_back((*it).getId());
- }
- list<ResourceRequest> asksBlank;
- ResourceBlacklistRequest blacklistRequestBlank;
- response = amrmClientAlias->allocate(asksBlank,
releases,
-
blacklistRequestBlank, response_id, progress);
- response_id = response.getResponseId();
- }
- pthread_mutex_unlock(&heartbeatLock);
- }
- catch (const YarnException &e) {
- LOG(WARNING, "LibYarnClient::dummyAllocate, dummy allocation "
- "is not correctly executed with
exception raised. %s",
- e.msg());
- pthread_mutex_unlock(&heartbeatLock);
- throw;
- }
+ ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
+
+ //1) requestProto_blank
+ list<ResourceRequest> asksBlank;
+ //2) releasesBlank
+ list<ContainerId> releasesBlank;
+ //3) blacklistRequestBlank
+ ResourceBlacklistRequest blacklistRequestBlank;
+ //4) progress
+ float progress = 0.5;
+ int allocatedNum = 0;
+
+ try {
+ LOG(DEBUG1, "LibYarnClient::dummyAllocate, do a AM-RM heartbeat with
response_id:%d", response_id);
+ AllocateResponse response = amrmClientAlias->allocate(asksBlank,
+ releasesBlank, blacklistRequestBlank, response_id, progress);
+ response_id = response.getResponseId();
+ list<Container> allocatedContainers =
response.getAllocatedContainers();
+ allocatedNum = allocatedContainers.size();
+ LOG(DEBUG1, "LibYarnClient::dummyAllocate returned response_id :%d",
response_id);
+ if (allocatedNum > 0) {
+ /*
+ * In rare case, client gets allocated containers in heartbeat,
+ * free them immediately.
+ */
+ LOG(DEBUG1, "LibYarnClient::dummyAllocate returned allocated size:
%d, "
+ "free them immediately.", allocatedNum);
+ list<ContainerId> releases;
+ for (list<Container>::iterator it = allocatedContainers.begin();
+ it != allocatedContainers.end(); it++) {
+ releases.push_back((*it).getId());
+ }
+ list<ResourceRequest> asksBlank;
+ ResourceBlacklistRequest blacklistRequestBlank;
+ response = amrmClientAlias->allocate(asksBlank, releases,
+ blacklistRequestBlank, response_id, progress);
+ response_id = response.getResponseId();
+ }
+ pthread_mutex_unlock(&heartbeatLock);
+ }
+ catch (const YarnException &e) {
+ LOG(WARNING, "LibYarnClient::dummyAllocate, dummy allocation "
+ "is not correctly executed with exception raised. %s",
+ e.msg());
+ pthread_mutex_unlock(&heartbeatLock);
+ throw;
+ }
}
void LibYarnClient::addResourceRequest(Resource capability,
-
int32_t num_containers,
-
string host,
-
int32_t priority,
- bool
relax_locality)
+ int32_t num_containers, string host, int32_t priority,
+ bool relax_locality)
{
- ResourceRequest *req = new ResourceRequest();
- req->setCapability(capability);
- req->setNumContainers(num_containers);
- Priority priorityProto;
- priorityProto.setPriority(priority);
- req->setPriority(priorityProto);
- req->setResourceName(host);
- req->setRelaxLocality(relax_locality);
- try {
- askRequests.push_back(*req);
- LOG(INFO, "LibYarnClient::put a request into ask list, "
- "mem:%d, cpu:%d, priority:%d, resource
name:%s, relax:%d, num_containers:%d",
- capability.getMemory(),
capability.getVirtualCores(), priority, host.c_str(),
- relax_locality, num_containers);
- } catch (std::exception &e) {
- LOG(WARNING, "LibYarnClient::Fail to add a resource request "
- "to ask list. %s ", e.what());
- }
+ ResourceRequest *req = new ResourceRequest();
+ req->setCapability(capability);
+ req->setNumContainers(num_containers);
+ Priority priorityProto;
+ priorityProto.setPriority(priority);
+ req->setPriority(priorityProto);
+ req->setResourceName(host);
+ req->setRelaxLocality(relax_locality);
+ try {
+ askRequests.push_back(*req);
+ LOG(DEBUG1, "LibYarnClient::put a request into ask list, "
+ "mem:%d, cpu:%d, priority:%d, resource name:%s, relax:%d,
num_containers:%d",
+ capability.getMemory(), capability.getVirtualCores(),
priority, host.c_str(),
+ relax_locality, num_containers);
+ } catch (std::exception &e) {
+ LOG(WARNING, "LibYarnClient::Fail to add a resource request "
+ "to ask list. %s ", e.what());
+ }
}
/*
@@ -416,84 +414,85 @@ void LibYarnClient::addResourceRequest(Resource
capability,
* is supported.
*
* Parameters:
- * jobId: jobId
- * capability: the quota of the resource
- * count: the required number of the containers
- * preferred: node list, NULL means ANY host. If one node's rack
name is NULL, a default rack name is set.
- * priority: priority
+ * jobId: jobId
+ * capability: the quota of the resource
+ * count: the required number of the containers
+ * preferred: node list, NULL means ANY host. If one node's rack name is
NULL, a default rack name is set.
+ * priority: priority
*/
-int LibYarnClient::addContainerRequests(string &jobId, Resource &capability,
int32_t num_containers,
-
list<LibYarnNodeInfo> &preferred, int32_t priority, bool relax_locality)
+int LibYarnClient::addContainerRequests(string &jobId, Resource &capability,
+ int32_t num_containers, list<LibYarnNodeInfo> &preferred,
+ int32_t priority, bool relax_locality)
{
- try {
- if (jobId != clientJobId) {
- throw std::invalid_argument("The jobId is wrong, check
the jobId argument");
- }
-
- map<string, int32_t> inferredRacks;
-
- for (list<LibYarnNodeInfo>::iterator iter = preferred.begin();
- iter != preferred.end(); iter++) {
- LOG(INFO, "LibYarnClient::addContainerRequests, "
- "get a preferred host info,
host:%s,rack:%s,container number:%d",
- iter->getHost().c_str(),
iter->getRack().c_str(), iter->getContainerNum());
- /* add a resource request for this node */
- addResourceRequest(capability, iter->getContainerNum(),
iter->getHost(), priority, true);
- map<string, int32_t>:: iterator it =
inferredRacks.find(iter->getRack());
- if (it != inferredRacks.end())
- it->second += iter->getContainerNum();
- else
- inferredRacks.insert(make_pair(iter->getRack(),
iter->getContainerNum()));
- }
-
- /* add resource requests for racks*/
- for (map<string, int32_t>:: iterator it = inferredRacks.begin()
;
- it != inferredRacks.end(); it++)
- addResourceRequest(capability, it->second, it->first,
priority, relax_locality);
-
- /* add resource request for off-switch */
- addResourceRequest(capability, num_containers, YARN_HOST_ANY,
priority, relax_locality);
-
- return FR_SUCCEEDED;
- } catch (std::exception &e) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::addContainerRequests catch std exception:"
<< e.what();
+ try {
+ if (jobId != clientJobId) {
+ throw std::invalid_argument("The jobId is wrong, check the jobId
argument");
+ }
+
+ map<string, int32_t> inferredRacks;
+
+ for (list<LibYarnNodeInfo>::iterator iter = preferred.begin();
+ iter != preferred.end(); iter++) {
+ LOG(DEBUG1, "LibYarnClient::addContainerRequests, "
+ "get a preferred host info, host:%s,rack:%s,container
number:%d",
+ iter->getHost().c_str(), iter->getRack().c_str(),
iter->getContainerNum());
+ /* add a resource request for this node */
+ addResourceRequest(capability, iter->getContainerNum(),
iter->getHost(), priority, true);
+ map<string, int32_t>::iterator it =
inferredRacks.find(iter->getRack());
+ if (it != inferredRacks.end())
+ it->second += iter->getContainerNum();
+ else
+ inferredRacks.insert(make_pair(iter->getRack(),
iter->getContainerNum()));
+ }
+
+ /* add resource requests for racks*/
+ for (map<string, int32_t>::iterator it = inferredRacks.begin();
+ it != inferredRacks.end(); it++) {
+ addResourceRequest(capability, it->second, it->first, priority,
relax_locality);
+ }
+
+ /* add resource request for off-switch */
+ addResourceRequest(capability, num_containers, YARN_HOST_ANY,
priority, relax_locality);
+
+ return FR_SUCCEEDED;
+ } catch (std::exception &e) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::addContainerRequests catch std exception:"
<< e.what();
setErrorMessage(errorMsg.str());
- return FR_FAILED;
- } catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::addContainerRequests catch unexpected
exception.";
+ return FR_FAILED;
+ } catch (...) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::addContainerRequests catch unexpected
exception.";
setErrorMessage(errorMsg.str());
- return FR_FAILED;
- }
+ return FR_FAILED;
+ }
}
/*
-message AllocateRequestProto {
-repeated ResourceRequestProto ask = 1;
-repeated ContainerIdProto release = 2;
-optional ResourceBlacklistRequestProto blacklist_request = 3;
-optional int32 response_id = 4;
-optional float progress = 5;
-}
-
-message AllocateResponseProto {
-optional AMCommandProto a_m_command = 1;
-optional int32 response_id = 2;
-repeated ContainerProto allocated_containers = 3;
-repeated ContainerStatusProto completed_container_statuses = 4;
-optional ResourceProto limit = 5;
-repeated NodeReportProto updated_nodes = 6;
-optional int32 num_cluster_nodes = 7;
-optional PreemptionMessageProto preempt = 8;
-repeated NMTokenProto nm_tokens = 9;
-}
-*/
+ message AllocateRequestProto {
+ repeated ResourceRequestProto ask = 1;
+ repeated ContainerIdProto release = 2;
+ optional ResourceBlacklistRequestProto blacklist_request = 3;
+ optional int32 response_id = 4;
+ optional float progress = 5;
+ }
+
+ message AllocateResponseProto {
+ optional AMCommandProto a_m_command = 1;
+ optional int32 response_id = 2;
+ repeated ContainerProto allocated_containers = 3;
+ repeated ContainerStatusProto completed_container_statuses = 4;
+ optional ResourceProto limit = 5;
+ repeated NodeReportProto updated_nodes = 6;
+ optional int32 num_cluster_nodes = 7;
+ optional PreemptionMessageProto preempt = 8;
+ repeated NMTokenProto nm_tokens = 9;
+ }
+ */
int LibYarnClient::allocateResources(string &jobId,
- list<string> &blackListAdditions,
- list<string> &blackListRemovals,
- list<Container> &allocatedContainers,
- int32_t num_containers) {
+ list<string> &blackListAdditions, list<string> &blackListRemovals,
+ list<Container> &allocatedContainers, int32_t num_containers)
+{
try{
AllocateResponse response;
int retry = 5;
@@ -509,8 +508,8 @@ int LibYarnClient::allocateResources(string &jobId,
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat
thread has stopped.");
}
- ApplicationMaster* amrmClientAlias = (ApplicationMaster*)
amrmClient;
- list<Container> allocatedContainerCache;
+ ApplicationMaster* amrmClientAlias = (ApplicationMaster*) amrmClient;
+ list<Container> allocatedContainerCache;
list<ContainerReport> preContainerReports;
preContainerReports = ((ApplicationClient*)
appClient)->getContainers(clientAppAttempId);
@@ -520,15 +519,17 @@ int LibYarnClient::allocateResources(string &jobId,
blacklistRequest.setBlacklistRemovals(blackListRemovals);
float progress = 0.5;
- LOG(INFO,"LibYarnClient::allocate, ask: container number:%d,",
num_containers);
+ LOG(DEBUG1, "LibYarnClient::request to allocate resource, container
number:%d",
+ num_containers);
while (retry > 0) {
- LOG(INFO,"LibYarnClient::allocate with response id : %d",
response_id);
- AllocateResponse response =
amrmClientAlias->allocate(this->askRequests, releasesBlank,
- blacklistRequest, response_id, progress);
+ LOG(DEBUG1, "LibYarnClient::allocate with response id : %d",
response_id);
+ AllocateResponse response = amrmClientAlias->allocate(
+ this->askRequests, releasesBlank, blacklistRequest,
+ response_id, progress);
response_id = response.getResponseId();
- LOG(INFO,"LibYarnClient::allocate returned response id : %d",
response_id);
- list<NMToken> nmTokens = response.getNMTokens();
+ LOG(DEBUG1, "LibYarnClient::allocate returned response id : %d",
response_id);
+ list<NMToken> nmTokens = response.getNMTokens();
for (list<NMToken>::iterator it = nmTokens.begin(); it !=
nmTokens.end(); it++) {
std::ostringstream oss;
oss << (*it).getNodeId().getHost() << ":" <<
(*it).getNodeId().getPort();
@@ -538,7 +539,7 @@ int LibYarnClient::allocateResources(string &jobId,
list<Container> allocatedContainerOnce =
response.getAllocatedContainers();
allocatedNumOnce = allocatedContainerOnce.size();
if (allocatedNumOnce <= 0) {
- LOG(WARNING, "LibYarnClient:: fail to allocate from YARN RM,
try again");
+ LOG(DEBUG1, "LibYarnClient:: fail to allocate from YARN RM,
try again");
retry--;
if(retry == 0 && allocatedNumTotal == 0) {
/* If failed, just return to Resource Broker to handle*/
@@ -549,10 +550,10 @@ int LibYarnClient::allocateResources(string &jobId,
} else {
allocatedNumTotal += allocatedNumOnce;
allocatedContainerCache.insert(allocatedContainerCache.end(),
allocatedContainerOnce.begin(), allocatedContainerOnce.end());
- LOG(INFO, "LibYarnClient:: allocate %d containers from YARN
RM", allocatedNumOnce);
+ LOG(DEBUG1, "LibYarnClient:: allocate %d containers from YARN
RM", allocatedNumOnce);
if (allocatedNumTotal >= num_containers) {
- LOG(INFO, "LibYarnClient:: allocate enough containers from
YARN RM, "
- "expected:%d, total:%d", num_containers,
allocatedNumTotal);
+ LOG(DEBUG1, "LibYarnClient:: allocate enough containers
from YARN RM, "
+ "expected:%d, total:%d", num_containers,
allocatedNumTotal);
break;
}
@@ -560,39 +561,43 @@ int LibYarnClient::allocateResources(string &jobId,
usleep(TimeInterval::ALLOCATE_INTERVAL_MS);
}
- LOG(INFO,"LibYarnClient::allocate, ask: response_id:%d, allocated
container number:%d",
+ LOG(INFO,"LibYarnClient::allocate resource, response_id:%d, allocated
container number:%d",
response_id, allocatedNumTotal);
/* a workaround for allocate more container than request */
list<ContainerId> releases;
list<ContainerReport> afterContainerReports;
afterContainerReports = ((ApplicationClient*)
appClient)->getContainers(clientAppAttempId);
- for (list<ContainerReport>::iterator ait=afterContainerReports.begin();
- ait!=afterContainerReports.end(); ait++){
+ for (list<ContainerReport>::iterator ait =
afterContainerReports.begin();
+ ait != afterContainerReports.end(); ait++) {
bool foundInPre = false;
- for (list<ContainerReport>::iterator
pit=preContainerReports.begin();pit!=preContainerReports.end();pit++){
+ for (list<ContainerReport>::iterator pit =
+ preContainerReports.begin();
+ pit != preContainerReports.end(); pit++) {
if (pit->getId().getId() == ait->getId().getId()) {
foundInPre = true;
break;
}
}
- if (!foundInPre){
+ if (!foundInPre) {
bool foundInNewAllocated = false;
- for (list<Container>::iterator cit =
allocatedContainerCache.begin();cit!=allocatedContainerCache.end();cit++){
- if(cit->getId().getId() == ait->getId().getId()){
+ for (list<Container>::iterator cit =
+ allocatedContainerCache.begin();
+ cit != allocatedContainerCache.end(); cit++) {
+ if (cit->getId().getId() == ait->getId().getId()) {
foundInNewAllocated = true;
break;
}
}
- if (!foundInNewAllocated){
+ if (!foundInNewAllocated) {
releases.push_back((*ait).getId());
}
}
}
int totalNeedRelease = allocatedContainerCache.size() - num_containers;
- LOG(INFO,"LibYarnClient::allocateResources, ask: finished:
total_allocated_containers:%ld, total_need_release:%d",
- allocatedContainerCache.size(), totalNeedRelease);
+ LOG(DEBUG1, "LibYarnClient::allocateResources,
total_allocated_containers:%ld, total_need_release:%d",
+ allocatedContainerCache.size(), totalNeedRelease);
if(totalNeedRelease > 0) {
for (int i = 0; i < totalNeedRelease; i++) {
list<Container>::iterator it = allocatedContainerCache.begin();
@@ -608,15 +613,16 @@ int LibYarnClient::allocateResources(string &jobId,
}
/* 3. store allocated containers */
- for(list<Container>::iterator it = allocatedContainerCache.begin();it
!= allocatedContainerCache.end();it++){
+ for (list<Container>::iterator it = allocatedContainerCache.begin();
+ it != allocatedContainerCache.end(); it++) {
Container *container = new Container((*it));
int64_t containerId = container->getId().getId();
jobIdContainers[containerId] = container;
}
allocatedContainers = allocatedContainerCache;
- LOG(INFO,"LibYarnClient::allocateResources, put all allocated
containers size:%ld",
- allocatedContainerCache.size());
+ LOG(DEBUG1, "LibYarnClient::allocateResources, put all allocated
containers size:%ld",
+ allocatedContainerCache.size());
pthread_mutex_unlock(&heartbeatLock);
@@ -649,12 +655,13 @@ int LibYarnClient::allocateResources(string &jobId,
}
}
-int LibYarnClient::releaseResources(string &jobId,int64_t
releaseContainerIds[],int releaseContainerSize) {
+int LibYarnClient::releaseResources(string &jobId,int64_t
releaseContainerIds[], int releaseContainerSize)
+{
try{
pthread_mutex_lock(&heartbeatLock);
- if (jobId != clientJobId) {
- throw std::invalid_argument("The jobId is wrong,please
check the jobId argument");
- }
+ if (jobId != clientJobId) {
+ throw std::invalid_argument("The jobId is wrong,please check the
jobId argument");
+ }
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat
thread has stopped.");
@@ -665,9 +672,9 @@ int LibYarnClient::releaseResources(string &jobId,int64_t
releaseContainerIds[],
list<ResourceRequest> asksBlank;
//2) releases
list<ContainerId> releases;
- for (int i = 0;i < releaseContainerSize;i++){
+ for (int i = 0; i < releaseContainerSize; i++) {
int64_t containerId = releaseContainerIds[i];
- map<int64_t,Container*>::iterator it =
jobIdContainers.find(containerId);
+ map<int64_t, Container*>::iterator it =
jobIdContainers.find(containerId);
if (it != jobIdContainers.end()) {
releases.push_back((it->second)->getId());
}
@@ -677,83 +684,85 @@ int LibYarnClient::releaseResources(string &jobId,int64_t
releaseContainerIds[],
//4) progress
float progress = 0.5;
- LOG(INFO, "LibYarnClient::releaseResource, release
size:%d",releases.size());
- AllocateResponse response = amrmClientAlias->allocate(asksBlank,
releases,
- blacklistRequestBlank, response_id, progress);
+ AllocateResponse response = amrmClientAlias->allocate(asksBlank,
+ releases, blacklistRequestBlank, response_id, progress);
response_id = response.getResponseId();
//erase from the map jobIdContainers
- for (list<ContainerId>::iterator it = releases.begin();it !=
releases.end();it++){
- LOG(INFO, "LibYarnClient::releaseResource, released
ContainerId:%ld",it->getId());
- map<int64_t,Container*>::iterator cit =
jobIdContainers.find(it->getId());
- if (cit != jobIdContainers.end()){
+ for (list<ContainerId>::iterator it = releases.begin(); it !=
releases.end(); it++) {
+ LOG(DEBUG1, "LibYarnClient::releaseResource, released
ContainerId:%ld",
+ it->getId());
+ map<int64_t, Container*>::iterator cit =
jobIdContainers.find(it->getId());
+ if (cit != jobIdContainers.end()) {
delete cit->second;
cit->second = NULL;
jobIdContainers.erase(it->getId());
}
//erase the element if in activeFailContainers
set<int64_t>::iterator sit =
activeFailContainerIds.find(it->getId());
- if(sit != activeFailContainerIds.end()){
- LOG(INFO, "LibYarnClient::releaseResource, remove %ld from
activeFailContainerIds",(*sit));
+ if (sit != activeFailContainerIds.end()) {
+ LOG(INFO, "LibYarnClient::releaseResource, remove %ld from
activeFailContainerIds",
+ (*sit));
activeFailContainerIds.erase(*sit);
}
}
- LOG(INFO, "LibYarnClient::releaseResources, release complete");
+ LOG(INFO, "LibYarnClient::release resources, container number:%d",
+ releases.size());
pthread_mutex_unlock(&heartbeatLock);
return FR_SUCCEEDED;
- } catch(std::exception &e) {
+ } catch (std::exception &e) {
stringstream errorMsg;
errorMsg << "LibYarnClient::releaseResources, catch exception:" <<
e.what();
setErrorMessage(errorMsg.str());
pthread_mutex_unlock(&heartbeatLock);
return FR_FAILED;
} catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::releaseResources, catch unexpected
exception.";
- setErrorMessage(errorMsg.str());
- pthread_mutex_unlock(&heartbeatLock);
- return FR_FAILED;
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::releaseResources, catch unexpected
exception.";
+ setErrorMessage(errorMsg.str());
+ pthread_mutex_unlock(&heartbeatLock);
+ return FR_FAILED;
}
}
/*
----------------------
-message StartContainerRequestProto {
- optional ContainerLaunchContextProto container_launch_context = 1;
- optional hadoop.common.TokenProto container_token = 2;
-}
-
-message StartContainerResponseProto {
- repeated StringBytesMapProto services_meta_data = 1;
-}
----------------------
-rpc startContainers(StartContainersRequestProto) returns
(StartContainersResponseProto);
----------------------
-message StartContainersRequestProto {
- repeated StartContainerRequestProto start_container_request = 1;
-}
-
-message StartContainersResponseProto {
- repeated StringBytesMapProto services_meta_data = 1;
- repeated ContainerIdProto succeeded_requests = 2;
- repeated ContainerExceptionMapProto failed_requests = 3;
-}
-*/
+ ---------------------
+ message StartContainerRequestProto {
+ optional ContainerLaunchContextProto container_launch_context = 1;
+ optional hadoop.common.TokenProto container_token = 2;
+ }
+
+ message StartContainerResponseProto {
+ repeated StringBytesMapProto services_meta_data = 1;
+ }
+ ---------------------
+ rpc startContainers(StartContainersRequestProto) returns
(StartContainersResponseProto);
+ ---------------------
+ message StartContainersRequestProto {
+ repeated StartContainerRequestProto start_container_request = 1;
+ }
+
+ message StartContainersResponseProto {
+ repeated StringBytesMapProto services_meta_data = 1;
+ repeated ContainerIdProto succeeded_requests = 2;
+ repeated ContainerExceptionMapProto failed_requests = 3;
+ }
+ */
int LibYarnClient::activeResources(string &jobId,int64_t
activeContainerIds[],int activeContainerSize) {
try{
- if (jobId != clientJobId) {
- throw std::invalid_argument("The jobId is wrong,please
check the jobId argument");
- }
+ if (jobId != clientJobId) {
+ throw std::invalid_argument("The jobId is wrong,please check the
jobId argument");
+ }
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat
thread has stopped.");
}
- LOG(INFO, "LibYarnClient::activeResources, activeResources
started");
+ LOG(DEBUG1, "LibYarnClient::activeResources, activeResources started");
- for (int i = 0; i < activeContainerSize; i++){
+ for (int i = 0; i < activeContainerSize; i++) {
int64_t containerId = activeContainerIds[i];
map<int64_t, Container*>::iterator it =
jobIdContainers.find(containerId);
if (it != jobIdContainers.end()) {
- try{
+ try {
Container *container = it->second;
ostringstream key;
key << container->getNodeId().getHost() << ":" <<
container->getNodeId().getPort();
@@ -768,27 +777,29 @@ int LibYarnClient::activeResources(string &jobId,int64_t
activeContainerIds[],in
request.setContainerLaunchCtx(ctx);
Token cToken = container->getContainerToken();
request.setContainerToken(cToken);
- LOG(INFO, "LibYarnClient::activeResources active
containerId:%ld", containerId);
+ LOG(DEBUG1, "LibYarnClient::activeResources active
containerId:%ld", containerId);
((ContainerManagement*)nmClient)->startContainer((*container), request,
nmToken);
- } catch(std::exception& e){
- LOG(INFO, "LibYarnClient::activeResources,
activeResources Failed Id:%ld,exception:%s",containerId,e.what());
+ } catch (std::exception& e) {
+ LOG(WARNING, "LibYarnClient::activeResources,
activeResources Failed Id:%ld,exception:%s",
+ containerId,e.what());
activeFailContainerIds.insert(containerId);
}
}
}
- //using namespace Yarn::Internal;
- LOG(INFO, "LibYarnClient::activeResources, activeResources
finished");
+
+ LOG(INFO, "LibYarnClient::active resources, container number:%d",
+ activeContainerSize);
return FR_SUCCEEDED;
- } catch(std::exception& e){
+ } catch (std::exception& e) {
stringstream errorMsg;
errorMsg << "LibYarnClient::activeResources, Catch the Exception:" <<
e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::activeResources, catch unexpected
exception.";
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::activeResources, catch unexpected
exception.";
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
}
}
int LibYarnClient::getActiveFailContainerIds(set<int64_t> &activeFailIds){
@@ -799,14 +810,14 @@ int LibYarnClient::getActiveFailContainerIds(set<int64_t>
&activeFailIds){
int LibYarnClient::finishJob(string &jobId, FinalApplicationStatus
finalStatus) {
#ifndef MOCKTEST
- if ( keepRun ) {
+ if (keepRun) {
// No need to run heart-beat thread now.
keepRun = false;
void *thrc = NULL;
int rc = pthread_join(heartbeatThread, &thrc);
- if ( rc != 0 ) {
- LOG(INFO, "LibYarnClient::finishJob, fail to join heart-beat
thread. "
- "error code %d", rc);
+ if (rc != 0) {
+ LOG(WARNING, "LibYarnClient::finishJob, fail to join heart-beat
thread. "
+ "error code %d", rc);
return FR_FAILED;
} else {
LOG(INFO, "LibYarnClient::finishJob, join heart-beat thread
successfully.");
@@ -814,30 +825,31 @@ int LibYarnClient::finishJob(string &jobId,
FinalApplicationStatus finalStatus)
}
#endif
- try{
+ try {
if (jobId != clientJobId) {
throw std::invalid_argument("The jobId is wrong,please check the
jobId argument");
}
needHeartbeatAlive = false;
//1. we should stop all containers related with this job
- for (map<int64_t,Container*>::iterator it = jobIdContainers.begin();
it != jobIdContainers. end(); it++) {
+ for (map<int64_t, Container*>::iterator it = jobIdContainers.begin();
it != jobIdContainers. end(); it++) {
ostringstream key;
Container *container = it->second;
key << container->getNodeId().getHost() << ":" <<
container->getNodeId().getPort();
Token nmToken = nmTokenCache[key.str()];
- ((ContainerManagement*)nmClient)->stopContainer((*container),
nmToken);
- LOG(INFO,"LibYarnClient::finishJob, container:%ld is
stopped",container->getId().getId());
+ ((ContainerManagement*) nmClient)->stopContainer((*container),
nmToken);
+ LOG(DEBUG1, "LibYarnClient::finishJob, container:%ld is
stopped",container->getId().getId());
}
- LOG(INFO,"LibYarnClient::finishJob, all containers for jobId:%s are
stopped",jobId.c_str());
+ LOG(DEBUG1, "LibYarnClient::finishJob, all containers for jobId:%s are
stopped",jobId.c_str());
//2. finish AM
string diagnostics("");
string tracking_url("");
((ApplicationMaster*)
amrmClient)->finishApplicationMaster(diagnostics, tracking_url, finalStatus);
LOG(INFO, "LibYarnClient::finishJob, finish AM for jobId:%s,
finalStatus:%d", jobId.c_str(), finalStatus);
//free the Container* memory
- for (map<int64_t,Container*>::iterator it = jobIdContainers.begin();
it != jobIdContainers.end(); it++) {
- LOG(INFO,"LibYarnClient::finishJob, container:%ld in
jobIdContainers are delete",it->second->getId().getId());
+ for (map<int64_t, Container*>::iterator it = jobIdContainers.begin();
it != jobIdContainers.end(); it++) {
+ LOG(DEBUG1, "LibYarnClient::finishJob, container:%ld in
jobIdContainers is deleted",
+ it->second->getId().getId());
delete it->second;
it->second = NULL;
}
@@ -847,154 +859,150 @@ int LibYarnClient::finishJob(string &jobId,
FinalApplicationStatus finalStatus)
return FR_SUCCEEDED;
} catch (std::exception& e) {
stringstream errorMsg;
-
errorMsg << "LibYarnClient::finishJob, catch the Exception:" <<
e.what();
setErrorMessage(errorMsg.str());
-
return FR_FAILED;
} catch (const ApplicationMasterNotRegisteredException &e) {
stringstream errorMsg;
-
errorMsg << "LibYarnClient::finishJob, "
"catch ApplicationMasterNotRegisteredException." <<
e.what();
setErrorMessage(errorMsg.str());
-
return FR_FAILED;
} catch (...) {
stringstream errorMsg;
-
errorMsg << "LibYarnClient::finishJob, catch unexpected exception.";
setErrorMessage(errorMsg.str());
-
return FR_FAILED;
}
}
-int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport
&applicationReport){
- try {
- if (jobId != clientJobId) {
- throw std::invalid_argument("The jobId is wrong,please
check the jobId argument");
- }
+int LibYarnClient::getApplicationReport(string &jobId,ApplicationReport
&applicationReport) {
+ try {
+ if (jobId != clientJobId) {
+ throw std::invalid_argument("The jobId is wrong,please check the
jobId argument");
+ }
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat
thread has stopped.");
}
- LOG(INFO,"LibYarnClient::getApplicationReport,
appId[cluster_timestamp:%lld,id:%d]",
- clientAppId.getClusterTimestamp(),
clientAppId.getId());
- applicationReport = ((ApplicationClient*)
appClient)->getApplicationReport(clientAppId);
- LOG(INFO,"LibYarnClient::getApplicationReport,
appId[cluster_timestamp:%lld,id:%d],getCurrentAppAttemptId:%d",
-
applicationReport.getApplicationId().getClusterTimestamp(),
- applicationReport.getApplicationId().getId(),
-
applicationReport.getCurrentAppAttemptId().getAttemptId());
+ LOG(DEBUG1, "LibYarnClient::getApplicationReport,
appId[cluster_timestamp:%lld,id:%d]",
+ clientAppId.getClusterTimestamp(), clientAppId.getId());
+ applicationReport = ((ApplicationClient*)
appClient)->getApplicationReport(clientAppId);
+ LOG(DEBUG1, "LibYarnClient::getApplicationReport,
appId[cluster_timestamp:%lld,id:%d],getCurrentAppAttemptId:%d",
+ applicationReport.getApplicationId().getClusterTimestamp(),
+ applicationReport.getApplicationId().getId(),
+ applicationReport.getCurrentAppAttemptId().getAttemptId());
- return FR_SUCCEEDED;
+ return FR_SUCCEEDED;
} catch (std::exception& e) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::getApplicationReport, Catch the
Exception:"
- << e.what();
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
- } catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::getApplicationReport, catch
unexpected exception.";
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::getApplicationReport, Catch the Exception:"
+ << e.what();
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
+ } catch (...) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::getApplicationReport, catch unexpected
exception.";
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
}
}
-int LibYarnClient::getContainerReports(string &jobId,list<ContainerReport>
&containerReports){
- try {
- if (jobId != clientJobId) {
- throw std::invalid_argument("The jobId is wrong,please
check the jobId argument");
- }
+int LibYarnClient::getContainerReports(string &jobId,list<ContainerReport>
&containerReports) {
+ try {
+ if (jobId != clientJobId) {
+ throw std::invalid_argument("The jobId is wrong,please check the
jobId argument");
+ }
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat
thread has stopped.");
}
- LOG(INFO,"LibYarnClient::getContainerReports,
appId[cluster_timestamp:%lld,id:%d]",
- clientAppId.getClusterTimestamp(),
clientAppId.getId());
- containerReports = ((ApplicationClient*)
appClient)->getContainers(clientAppAttempId);
- return FR_SUCCEEDED;
- } catch (std::exception& e) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::getContainerReports, Catch the
Exception:" << e.what();
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
- } catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::getContainerReports, catch
unexpected exception.";
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
+ LOG(DEBUG1, "LibYarnClient::getContainerReports,
appId[cluster_timestamp:%lld,id:%d]",
+ clientAppId.getClusterTimestamp(), clientAppId.getId());
+ containerReports = ((ApplicationClient*)
appClient)->getContainers(clientAppAttempId);
+ return FR_SUCCEEDED;
+ } catch (std::exception& e) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::getContainerReports, Catch the Exception:"
<< e.what();
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
+ } catch (...) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::getContainerReports, catch unexpected
exception.";
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
}
}
-int LibYarnClient::getContainerStatuses(string &jobId,int64_t
containerIds[],int containerSize,
- list<ContainerStatus>
&containerStatues){
- try {
- if (jobId != clientJobId) {
- throw std::invalid_argument("The jobId is wrong,please
check the jobId argument");
- }
+int LibYarnClient::getContainerStatuses(string &jobId, int64_t containerIds[],
+ int containerSize, list<ContainerStatus> &containerStatues)
+{
+ try {
+ if (jobId != clientJobId) {
+ throw std::invalid_argument("The jobId is wrong,please check the
jobId argument");
+ }
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat
thread has stopped.");
}
- for (int i = 0; i < containerSize; i++) {
- int64_t containerId = containerIds[i];
- map<int64_t, Container*>::iterator it =
jobIdContainers.find(containerId);
- if (it != jobIdContainers.end()) {
- try {
- Container *container = it->second;
- ostringstream key;
- key << container->getNodeId().getHost()
<< ":"<< container->getNodeId().getPort();
- Token nmToken = nmTokenCache[key.str()];
- ContainerStatus containerStatus =
((ContainerManagement*) nmClient)->getContainerStatus((*container), nmToken);
- // the response containerId will be 0
if the request containerId is not exist
- if
(containerStatus.getContainerId().getId() != 0){
-
containerStatues.push_back(containerStatus);
- }
- } catch (std::exception& e) {
-
LOG(INFO,"LibYarnClient::getContainerStatuses, getContainerStatuses Failed
Id:%ld,exception:%s",containerId, e.what());
- }
- }
- }
- return FR_SUCCEEDED;
- } catch (std::exception& e) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::getContainerStatuses, Catch the
Exception:" << e.what();
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
- } catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::getContainerStatuses, catch
unexpected exception.";
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
+ for (int i = 0; i < containerSize; i++) {
+ int64_t containerId = containerIds[i];
+ map<int64_t, Container*>::iterator it =
jobIdContainers.find(containerId);
+ if (it != jobIdContainers.end()) {
+ try {
+ Container *container = it->second;
+ ostringstream key;
+ key << container->getNodeId().getHost() << ":"<<
container->getNodeId().getPort();
+ Token nmToken = nmTokenCache[key.str()];
+ ContainerStatus containerStatus = ((ContainerManagement*)
nmClient)->getContainerStatus((*container), nmToken);
+ // the response containerId will be 0 if the request
containerId is not exist
+ if (containerStatus.getContainerId().getId() != 0){
+ containerStatues.push_back(containerStatus);
+ }
+ } catch (std::exception& e) {
+ LOG(INFO, "LibYarnClient::getContainerStatuses,
getContainerStatuses Failed Id:%ld,exception:%s",
+ containerId, e.what());
+ }
+ }
+ }
+ return FR_SUCCEEDED;
+ } catch (std::exception& e) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::getContainerStatuses, Catch the
Exception:" << e.what();
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
+ } catch (...) {
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::getContainerStatuses, catch unexpected
exception.";
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
}
}
int LibYarnClient::getQueueInfo(string &queue, bool includeApps,
- bool includeChildQueues, bool recursive,QueueInfo &queueInfo) {
- try{
+ bool includeChildQueues, bool recursive, QueueInfo &queueInfo)
+{
+ try {
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat
thread has stopped.");
}
- queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue,
includeApps,
- includeChildQueues, recursive);
+ queueInfo = ((ApplicationClient*) appClient)->getQueueInfo(queue,
+ includeApps, includeChildQueues, recursive);
return FR_SUCCEEDED;
- }
- catch(std::exception& e){
+ } catch (std::exception& e) {
stringstream errorMsg;
errorMsg << "LibYarnClient::getQueueInfo, Catch the Exception:" <<
e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::getQueueInfo, catch unexpected
exception.";
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::getQueueInfo, catch unexpected exception.";
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
}
}
@@ -1003,19 +1011,18 @@ int LibYarnClient::getClusterNodes(list<NodeState>
&states,list<NodeReport> &nod
if (!keepRun && needHeartbeatAlive) {
throw std::runtime_error("LibYarnClient::libyarn AM heartbeat
thread has stopped.");
}
- nodeReports = ((ApplicationClient*)
appClient)->getClusterNodes(states);
+ nodeReports = ((ApplicationClient*)
appClient)->getClusterNodes(states);
return FR_SUCCEEDED;
- }
- catch(std::exception& e){
+ } catch (std::exception& e) {
stringstream errorMsg;
errorMsg << "LibYarnClient::getClusterNodes, Catch the Exception:" <<
e.what();
setErrorMessage(errorMsg.str());
return FR_FAILED;
} catch (...) {
- stringstream errorMsg;
- errorMsg << "LibYarnClient::getClusterNodes, catch unexpected
exception.";
- setErrorMessage(errorMsg.str());
- return FR_FAILED;
+ stringstream errorMsg;
+ errorMsg << "LibYarnClient::getClusterNodes, catch unexpected
exception.";
+ setErrorMessage(errorMsg.str());
+ return FR_FAILED;
}
}
}