Repository: incubator-hawq Updated Branches: refs/heads/master 856991354 -> 641176646
HAWQ-676. Apply fix for common codes to libyarn Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/64117664 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/64117664 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/64117664 Branch: refs/heads/master Commit: 641176646831ac9674e77e2f552d573e2d83853e Parents: 8569913 Author: Wen Lin <[email protected]> Authored: Fri Apr 15 10:06:18 2016 +0800 Committer: Wen Lin <[email protected]> Committed: Fri Apr 15 10:06:18 2016 +0800 ---------------------------------------------------------------------- depends/libyarn/CMake/Options.cmake | 1 + depends/libyarn/src/CMakeLists.txt | 2 +- .../libyarn/src/common/ExceptionInternal.cpp | 54 ++++++++-------- depends/libyarn/src/common/ExceptionInternal.h | 65 +++++++++++++------- depends/libyarn/src/platform.h.in | 3 +- depends/libyarn/src/rpc/RpcChannel.cpp | 17 ++--- depends/libyarn/src/rpc/RpcClient.cpp | 7 ++- .../libyarn/test/function/TestLibYarnClient.cpp | 31 +++------- .../TestMockApplicationClientProtocol.cpp | 4 +- 9 files changed, 99 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/CMake/Options.cmake ---------------------------------------------------------------------- diff --git a/depends/libyarn/CMake/Options.cmake b/depends/libyarn/CMake/Options.cmake index 5de057b..bcd3d94 100644 --- a/depends/libyarn/CMake/Options.cmake +++ b/depends/libyarn/CMake/Options.cmake @@ -22,6 +22,7 @@ OPTION(ENABLE_SSE "enable SSE4.2 buildin function" ON) OPTION(ENABLE_FRAME_POINTER "enable frame pointer on 64bit system with flag -fno-omit-frame-pointer, on 32bit system, it is always enabled" ON) OPTION(ENABLE_LIBCPP "using libc++ instead of libstdc++, only valid for clang compiler" OFF) OPTION(ENABLE_BOOST "using boost instead of native compiler c++0x support" OFF) +OPTION(STRERROR_R_RETURN_INT "checking strerror_r return type is int or not" ON) INCLUDE (CheckFunctionExists) CHECK_FUNCTION_EXISTS(dladdr HAVE_DLADDR) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/CMakeLists.txt b/depends/libyarn/src/CMakeLists.txt index a1b2444..c2a3cff 100644 --- a/depends/libyarn/src/CMakeLists.txt +++ b/depends/libyarn/src/CMakeLists.txt @@ -2,7 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) SET(libyarn_VERSION_MAJOR 0) SET(libyarn_VERSION_MINOR 1) -SET(libyarn_VERSION_PATCH 15) +SET(libyarn_VERSION_PATCH 16) SET(libyarn_VERSION_STRING "${libyarn_VERSION_MAJOR}.${libyarn_VERSION_MINOR}.${libyarn_VERSION_PATCH}") SET(libyarn_VERSION_API 1) SET(libyarn_ROOT_SOURCES_DIR ${CMAKE_SOURCE_DIR}/src) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/common/ExceptionInternal.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/common/ExceptionInternal.cpp b/depends/libyarn/src/common/ExceptionInternal.cpp index 40f94e4..f559214 100644 --- a/depends/libyarn/src/common/ExceptionInternal.cpp +++ b/depends/libyarn/src/common/ExceptionInternal.cpp @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +#include "platform.h" #include "Exception.h" #include "ExceptionInternal.h" @@ -40,24 +41,18 @@ bool CheckOperationCanceled() { } const char * GetSystemErrorInfo(int eno) { - static THREAD_LOCAL char buffer[64]; static THREAD_LOCAL char message[64]; + char buffer[64], *pbuffer; + pbuffer = buffer; +#ifdef STRERROR_R_RETURN_INT strerror_r(eno, buffer, sizeof(buffer)); - snprintf(message, sizeof(message), "(errno: %d) %s", eno, buffer); +#else + pbuffer = strerror_r(eno, buffer, sizeof(buffer)); +#endif + snprintf(message, sizeof(message), "(errno: %d) %s", eno, pbuffer); return message; } -static THREAD_LOCAL std::string * MessageBuffer = NULL; -static THREAD_LOCAL once_flag once; - -static void CreateMessageBuffer() { - MessageBuffer = new std::string; -} - -static void InitMessageBuffer() { - call_once(once, &CreateMessageBuffer); - assert(MessageBuffer != NULL); -} static void GetExceptionDetailInternal(const Yarn::YarnException & e, std::stringstream & ss, bool topLevel); @@ -104,25 +99,25 @@ static void GetExceptionDetailInternal(const Yarn::YarnException & e, } } -const char * GetExceptionDetail(const Yarn::YarnException & e) { - std::stringstream ss; - GetExceptionDetailInternal(e, ss, true); - - try { - InitMessageBuffer(); - *MessageBuffer = ss.str(); - } catch (const std::bad_alloc & e) { +const char * GetExceptionDetail(const Yarn::YarnException & e, + std::string& buffer) { + try { + std::stringstream ss; + ss.imbue(std::locale::classic()); + GetExceptionDetailInternal(e, ss, true); + buffer = ss.str(); + } catch (const std::bad_alloc& e) { return "Out of memory"; } - return MessageBuffer->c_str(); + return buffer.c_str(); } -const char * GetExceptionDetail(const exception_ptr e) { +const char * GetExceptionDetail(const exception_ptr e, std::string& buffer) { std::stringstream ss; + ss.imbue(std::locale::classic()); try { - InitMessageBuffer(); Yarn::rethrow_exception(e); } catch (const Yarn::YarnException & nested) { GetExceptionDetailInternal(nested, ss, true); @@ -131,12 +126,12 @@ const char * GetExceptionDetail(const exception_ptr e) { } try { - *MessageBuffer = ss.str(); - } catch (const std::bad_alloc & e) { + buffer = ss.str(); + } catch (const std::bad_alloc& e) { return "Out of memory"; } - return MessageBuffer->c_str(); + return buffer.c_str(); } static void GetExceptionMessage(const std::exception & e, @@ -156,7 +151,7 @@ static void GetExceptionMessage(const std::exception & e, } try { - Yarn::rethrow_if_nested(e); + Yarn::rethrow_if_nested(e); } catch (const std::exception & nested) { GetExceptionMessage(nested, ss, recursive + 1); } @@ -164,9 +159,10 @@ static void GetExceptionMessage(const std::exception & e, const char * GetExceptionMessage(const exception_ptr e, std::string & buffer) { std::stringstream ss; + ss.imbue(std::locale::classic()); try { - Yarn::rethrow_exception(e); + Yarn::rethrow_exception(e); } catch (const std::bad_alloc & e) { return "Out of memory"; } catch (const std::exception & e) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/common/ExceptionInternal.h ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/common/ExceptionInternal.h b/depends/libyarn/src/common/ExceptionInternal.h index a45346d..930ee71 100644 --- a/depends/libyarn/src/common/ExceptionInternal.h +++ b/depends/libyarn/src/common/ExceptionInternal.h @@ -56,10 +56,29 @@ namespace Yarn { using boost::exception_ptr; using boost::rethrow_exception; using boost::current_exception; +} + +#else +#include <exception> +#include <stdexcept> + +namespace Yarn { +using std::rethrow_exception; +using std::current_exception; +using std::make_exception_ptr; +using std::exception_ptr; +} +#endif // include headers +#if defined(NEED_BOOST) || !defined(HAVE_NESTED_EXCEPTION) // define nested exception +namespace Yarn { +#ifdef NEED_BOOST class nested_exception : virtual public boost::exception { +#else +class nested_exception : virtual public std::exception { +#endif public: - nested_exception() : p(boost::current_exception()) { + nested_exception() : p(current_exception()) { } nested_exception(const nested_exception & other) : p(other.p) { @@ -73,14 +92,14 @@ public: virtual ~nested_exception() throw() {} void rethrow_nested() const { - boost::rethrow_exception(p); + rethrow_exception(p); } - boost::exception_ptr nested_ptr() const { + exception_ptr nested_ptr() const { return p; } protected: - boost::exception_ptr p; + exception_ptr p; }; template<typename BaseType> @@ -96,7 +115,11 @@ static inline void throw_with_nested(T const & e) { std::terminate(); } +#ifdef NEED_BOOST boost::throw_exception(ExceptionWrapper<T>(static_cast < T const & >(e))); +#else + throw ExceptionWrapper<T>(static_cast < T const & >(e)); +#endif } template<typename T> @@ -113,6 +136,16 @@ static inline void rethrow_if_nested(const nested_exception & e) { e.rethrow_nested(); } +} // namespace Yarn +#else // not boost and have nested exception +namespace Yarn { +using std::throw_with_nested; +using std::rethrow_if_nested; +} // namespace Yarn +#endif // define nested exception + +#ifdef NEED_BOOST +namespace Yarn { namespace Internal { @@ -150,24 +183,13 @@ void ThrowException(bool nested, const char * f, int l, throw std::logic_error("should not reach here."); } -} -} +} // namespace Internal +} // namespace Yarn #else -#include <exception> -#include <stdexcept> - namespace Yarn { - -using std::rethrow_exception; -using std::current_exception; -using std::make_exception_ptr; -using std::throw_with_nested; -using std::rethrow_if_nested; -using std::exception_ptr; - namespace Internal { template<typename THROWABLE> @@ -204,8 +226,8 @@ void ThrowException(bool nested, const char * f, int l, throw std::logic_error("should not reach here."); } -} -} +} // namespace Internal +} // namespace Yarn #endif @@ -237,7 +259,8 @@ bool CheckOperationCanceled(); * @param e The exception which detail message to be return. * @return The exception's detail message. */ -const char * GetExceptionDetail(const Yarn::YarnException & e); +const char * GetExceptionDetail(const Yarn::YarnException & e, + std::string &buffer); /** * Get a exception's detail message. @@ -245,7 +268,7 @@ const char * GetExceptionDetail(const Yarn::YarnException & e); * @param e The exception which detail message to be return. * @return The exception's detail message. */ -const char * GetExceptionDetail(const exception_ptr e); +const char * GetExceptionDetail(const exception_ptr e, std::string &buffer); const char * GetExceptionMessage(const exception_ptr e, std::string & buffer); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/platform.h.in ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/platform.h.in b/depends/libyarn/src/platform.h.in index 86d04e5..de2d294 100644 --- a/depends/libyarn/src/platform.h.in +++ b/depends/libyarn/src/platform.h.in @@ -30,7 +30,8 @@ #cmakedefine ENABLE_FRAME_POINTER #cmakedefine HAVE_SYMBOLIZE #cmakedefine NEED_BOOST - +#cmakedefine STRERROR_R_RETURN_INT +#cmakedefine HAVE_NESTED_EXCEPTION // defined by gcc #if defined(__ELF__) && defined(OS_LINUX) # define HAVE_SYMBOLIZE http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/rpc/RpcChannel.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/rpc/RpcChannel.cpp b/depends/libyarn/src/rpc/RpcChannel.cpp index 81d11d1..814541c 100644 --- a/depends/libyarn/src/rpc/RpcChannel.cpp +++ b/depends/libyarn/src/rpc/RpcChannel.cpp @@ -248,6 +248,7 @@ void RpcChannelImpl::connect() { exception_ptr lastError; const RpcConfig & conf = key.getConf(); const RpcServerInfo & server = key.getServer(); + std::string buffer; for (int i = 0; i < conf.getMaxRetryOnConnect(); ++i) { RpcAuth auth = key.getAuth(); @@ -294,19 +295,19 @@ void RpcChannelImpl::connect() { lastError = current_exception(); LOG(LOG_ERROR, "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s", - server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e)); + server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); } catch (const YarnNetworkException & e) { sleep = 1; lastError = current_exception(); LOG(LOG_ERROR, "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s", - server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e)); + server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); } catch (const YarnTimeoutException & e) { sleep = 1; lastError = current_exception(); LOG(LOG_ERROR, "Failed to setup RPC connection to \"%s:%s\" caused by:\n%s", - server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e)); + server.getHost().c_str(), server.getPort().c_str(), GetExceptionDetail(e, buffer)); } if (i + 1 < conf.getMaxRetryOnConnect()) { @@ -429,11 +430,12 @@ void RpcChannelImpl::invoke(const RpcCall & call) { if (!retry && call.isIdempotent()) { retry = true; + std::string buffer; LOG(LOG_ERROR, "Failed to invoke RPC call \"%s\" on server \"%s:%s\": \n%s", call.getName(), key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), - GetExceptionDetail(lastError)); + GetExceptionDetail(lastError, buffer)); LOG(INFO, "Retry idempotent RPC call \"%s\" on server \"%s:%s\"", call.getName(), key.getServer().getHost().c_str(), @@ -598,12 +600,13 @@ bool RpcChannelImpl::checkIdle() { sendPing(); } } catch (...) { + std::string buffer; LOG(LOG_ERROR, "Failed to send ping via idle RPC channel to server \"%s:%s\": " "\n%s", key.getServer().getHost().c_str(), key.getServer().getPort().c_str(), - GetExceptionDetail(current_exception())); + GetExceptionDetail(current_exception(), buffer)); sock->close(); return true; } @@ -751,8 +754,8 @@ void RpcChannelImpl::readOneResponse(bool writeLock) { std::vector<char> buffer(128); hadoop::common::RpcResponseHeaderProto curRespHeader; hadoop::common::RpcResponseHeaderProto::RpcStatusProto status; - uint32_t totalen, headerSize = 0, bodySize = 0; - totalen = in->readBigEndianInt32(readTimeout); + uint32_t headerSize = 0, bodySize = 0; + in->readBigEndianInt32(readTimeout); /* * read response header */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/src/rpc/RpcClient.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/src/rpc/RpcClient.cpp b/depends/libyarn/src/rpc/RpcClient.cpp index ae93edc..3dacaa0 100644 --- a/depends/libyarn/src/rpc/RpcClient.cpp +++ b/depends/libyarn/src/rpc/RpcClient.cpp @@ -95,8 +95,9 @@ void RpcClientImpl::clean() { } } } catch (const Yarn::YarnException & e) { + std::string buffer; LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s", - GetExceptionDetail(e)); + GetExceptionDetail(e, buffer)); } catch (const std::exception & e) { LOG(LOG_ERROR, "RpcClientImpl's idle cleaner exit: %s", e.what()); } @@ -146,8 +147,6 @@ RpcChannel & RpcClientImpl::getChannel(const RpcAuth & auth, allChannels[key] = rc; } - rc->addRef(); - if (!cleaning) { cleaning = true; @@ -157,6 +156,8 @@ RpcChannel & RpcClientImpl::getChannel(const RpcAuth & auth, CREATE_THREAD(cleaner, bind(&RpcClientImpl::clean, this)); } + // increase ref count after successfully done without any exception + rc->addRef(); } catch (const YarnRpcException & e) { throw; } catch (...) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/test/function/TestLibYarnClient.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/test/function/TestLibYarnClient.cpp b/depends/libyarn/test/function/TestLibYarnClient.cpp index 3043ea9..5260775 100644 --- a/depends/libyarn/test/function/TestLibYarnClient.cpp +++ b/depends/libyarn/test/function/TestLibYarnClient.cpp @@ -29,15 +29,16 @@ using namespace libyarn; class TestLibYarnClient: public ::testing::Test { public: TestLibYarnClient(){ + string user_name("postgres"); string rmHost("localhost"); - string rmPort("9980"); + string rmPort("8032"); string schedHost("localhost"); - string schedPort("9981"); + string schedPort("8030"); string amHost("localhost"); int32_t amPort = 0; string am_tracking_url("url"); int heartbeatInterval = 1000; - client = new LibYarnClient(rmHost, rmPort, schedHost, schedPort, amHost, amPort, am_tracking_url,heartbeatInterval); + client = new LibYarnClient(user_name, rmHost, rmPort, schedHost, schedPort, amHost, amPort, am_tracking_url, heartbeatInterval); } ~TestLibYarnClient(){ } @@ -47,33 +48,21 @@ protected: TEST_F(TestLibYarnClient,TestLibYarn){ string jobName("libyarn"); - string queue("sample_queue"); + string queue("default"); string jobId(""); int result = client->createJob(jobName, queue,jobId); EXPECT_EQ(result,0); - ResourceRequest resRequest; - string host("*"); - resRequest.setResourceName(host); - Resource capability; - capability.setVirtualCores(1); - capability.setMemory(1024); - resRequest.setCapability(capability); - resRequest.setNumContainers(3); - resRequest.setRelaxLocality(true); - Priority priority; - priority.setPriority(1); - resRequest.setPriority(priority); list<string> blackListAdditions; list<string> blackListRemovals; list<Container> allocatedResourcesArray; - result = client->allocateResources(jobId, resRequest, blackListAdditions, blackListRemovals,allocatedResourcesArray,5); + result = client->allocateResources(jobId, blackListAdditions, blackListRemovals,allocatedResourcesArray,5); EXPECT_EQ(result,0); int allocatedResourceArraySize = allocatedResourcesArray.size(); - int activeContainerIds[allocatedResourceArraySize]; - int releaseContainerIds[allocatedResourceArraySize]; - int statusContainerIds[allocatedResourceArraySize]; + int64_t activeContainerIds[allocatedResourceArraySize]; + int64_t releaseContainerIds[allocatedResourceArraySize]; + int64_t statusContainerIds[allocatedResourceArraySize]; int i = 0; for (list<Container>::iterator it = allocatedResourcesArray.begin();it != allocatedResourcesArray.end();it++){ activeContainerIds[i] = it->getId().getId(); @@ -86,7 +75,7 @@ TEST_F(TestLibYarnClient,TestLibYarn){ sleep(1); - set<int> activeFailIds; + set<int64_t> activeFailIds; result = client->getActiveFailContainerIds(activeFailIds); EXPECT_EQ(result,0); EXPECT_EQ(activeFailIds.size(),0); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/64117664/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp ---------------------------------------------------------------------- diff --git a/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp b/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp index 8acb039..24a110a 100644 --- a/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp +++ b/depends/libyarn/test/function/TestMockApplicationClientProtocol.cpp @@ -29,14 +29,14 @@ using std::string; class TestMockApplicationClientProtocol: public ::testing::Test { public: TestMockApplicationClientProtocol(){ + string user_name("postgres"); string rmHost("localhost"); string rmPort("8032"); string tokenService = ""; Yarn::Config config; Yarn::Internal::SessionConfig sessionConfig(config); Yarn::Internal::UserInfo user = Yarn::Internal::UserInfo::LocalUser(); - Yarn::Internal::RpcAuth rpcAuth(user, Yarn::Internal::AuthMethod::SIMPLE); - protocol = new MockApplicationClientProtocol(rmHost,rmPort,tokenService, sessionConfig,rpcAuth); + protocol = new MockApplicationClientProtocol(user_name, rmHost,rmPort,tokenService, sessionConfig); } ~TestMockApplicationClientProtocol(){ delete protocol;
