Repository: incubator-hawq Updated Branches: refs/heads/master 66f0bda1f -> f5b0cadb9
HAWQ-715. fix libyarn HA configuration bug Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/f5b0cadb Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/f5b0cadb Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/f5b0cadb Branch: refs/heads/master Commit: f5b0cadb9ecce7b73a19040e1115308e559d6f65 Parents: 66f0bda Author: Wen Lin <[email protected]> Authored: Wed Jun 15 09:33:42 2016 +0800 Committer: Wen Lin <[email protected]> Committed: Wed Jun 15 09:33:42 2016 +0800 ---------------------------------------------------------------------- .../src/libyarnclient/ApplicationClient.cpp | 37 ++++++++++++++++---- .../src/libyarnclient/ApplicationMaster.cpp | 29 +++++++++++++-- 2 files changed, 57 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5b0cadb/depends/libyarn/src/libyarnclient/ApplicationClient.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp index 293365f..ecbaf44 100644 --- a/depends/libyarn/src/libyarnclient/ApplicationClient.cpp +++ b/depends/libyarn/src/libyarnclient/ApplicationClient.cpp @@ -53,23 +53,48 @@ std::vector<RMInfo> RMInfo::getHARMInfo(const Yarn::Config & conf, const char* n LOG(INFO, "Yarn RM HA is not configured."); } -return retval; + return retval; } ApplicationClient::ApplicationClient(string &user, string &host, string &port) { + std::vector<RMInfo> rmConfInfos, rmInfos; + RMInfo activeRM; std::string tokenService = ""; + Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig(); Yarn::Internal::SessionConfig sessionConfig(*conf); - LOG(INFO, "ApplicationClient session auth method : %s", sessionConfig.getRpcAuthMethod().c_str()); - - std::vector<RMInfo> rmInfos = RMInfo::getHARMInfo(*conf, YARN_RESOURCEMANAGER_HA); + LOG(INFO, "ApplicationClient session auth method : %s", + sessionConfig.getRpcAuthMethod().c_str()); + + activeRM.setHost(host); + activeRM.setPort(port); + rmInfos.push_back(activeRM); + rmConfInfos = RMInfo::getHARMInfo(*conf, YARN_RESOURCEMANAGER_HA); + + /* build a list of candidate RMs without duplicate */ + for (vector<RMInfo>::iterator it = rmConfInfos.begin(); + it != rmConfInfos.end(); it++) { + bool found = false; + for (vector<RMInfo>::iterator it2 = rmInfos.begin(); + it2 != rmInfos.end(); it2++) { + if (it2->getHost() == it->getHost() + && it2->getPort() == it->getPort()) { + found = true; + break; + } + } + if (!found) + rmInfos.push_back(*it); + } if (rmInfos.size() <= 1) { LOG(INFO, "ApplicationClient Resource Manager HA is disable."); enableRMHA = false; maxRMHARetry = 0; } else { - LOG(INFO, "ApplicationClient Resource Manager HA is enable. Number of RM: %d", rmInfos.size()); + LOG(INFO, + "ApplicationClient Resource Manager HA is enable. Number of RM: %d", + rmInfos.size()); enableRMHA = true; maxRMHARetry = sessionConfig.getRpcMaxHaRetry(); } @@ -88,7 +113,7 @@ ApplicationClient::ApplicationClient(string &user, string &host, string &port) { std::shared_ptr<ApplicationClientProtocol>( new ApplicationClientProtocol( user, rmInfos[i].getHost(),rmInfos[i].getPort(), tokenService, sessionConfig))); - LOG(INFO, "ApplicationClient finds a standby RM, host:%s, port:%s", + LOG(INFO, "ApplicationClient finds a candidate RM, host:%s, port:%s", rmInfos[i].getHost().c_str(), rmInfos[i].getPort().c_str()); } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f5b0cadb/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp index ae96f8d..08d8cdb 100644 --- a/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp +++ b/depends/libyarn/src/libyarnclient/ApplicationMaster.cpp @@ -32,18 +32,41 @@ const char * YARN_RESOURCEMANAGER_SCHEDULER_HA = "yarn.resourcemanager.scheduler ApplicationMaster::ApplicationMaster(string &schedHost, string &schedPort, UserInfo &user, const string &tokenService) { + std::vector<RMInfo> rmInfos, rmConfInfos; Yarn::Internal::shared_ptr<Yarn::Config> conf = DefaultConfig().getConfig(); Yarn::Internal::SessionConfig sessionConfig(*conf); RpcAuth rpcAuth(user, AuthMethod::TOKEN); - std::vector<RMInfo> rmInfos = RMInfo::getHARMInfo(*conf, YARN_RESOURCEMANAGER_SCHEDULER_HA); + RMInfo activeRM; + activeRM.setHost(schedHost); + activeRM.setPort(schedPort); + rmInfos.push_back(activeRM); + rmConfInfos = RMInfo::getHARMInfo(*conf, YARN_RESOURCEMANAGER_SCHEDULER_HA); + + /* build a list of candidate RMs without duplicate */ + for (vector<RMInfo>::iterator it = rmConfInfos.begin(); + it != rmConfInfos.end(); it++) { + bool found = false; + for (vector<RMInfo>::iterator it2 = rmInfos.begin(); + it2 != rmInfos.end(); it2++) { + if (it2->getHost() == it->getHost() + && it2->getPort() == it->getPort()) { + found = true; + break; + } + } + if (!found) + rmInfos.push_back(*it); + } if (rmInfos.size() <= 1) { LOG(INFO, "ApplicationClient RM Scheduler HA is disable."); enableRMSchedulerHA = false; maxRMHARetry = 0; } else { - LOG(INFO, "ApplicationClient RM Scheduler HA is enable. Number of RM scheduler: %d", rmInfos.size()); + LOG(INFO, + "ApplicationClient RM Scheduler HA is enable. Number of RM scheduler: %d", + rmInfos.size()); enableRMSchedulerHA = true; maxRMHARetry = sessionConfig.getRpcMaxHaRetry(); } @@ -63,7 +86,7 @@ ApplicationMaster::ApplicationMaster(string &schedHost, string &schedPort, std::shared_ptr<ApplicationMasterProtocol>( new ApplicationMasterProtocol(rmInfos[i].getHost(), rmInfos[i].getPort(), tokenService, sessionConfig, rpcAuth))); - LOG(INFO, "ApplicationMaster finds a standby RM scheduler, host:%s, port:%s", + LOG(INFO, "ApplicationMaster finds a candidate RM scheduler, host:%s, port:%s", rmInfos[i].getHost().c_str(), rmInfos[i].getPort().c_str()); } }
