IMPALA-5537: Retry RPC on somes exceptions with SSL connection After the fix for IMPALA-5388, all TSSLException thrown will be treated as fatal error and the query will fail. Turns out that this is too strict and in a secure cluster under load, queries can easily hit timeout waiting for RPC response.
When running without SSL, we call RetryRpcRecv() to retry the recv part of an RPC if the TSocket underlying the RPC gets an EAGAIN during recv(). This change extends that logic to cover secure connection. In particular, we pattern match against the exception string "SSL_read: Resource temporarily unavailable" which corresponds to EAGAIN error code being thrown in the SSL_read() path. Similarly, we will handle closed connection in send() path with secure connection by pattern matching against the exception string "TTransportException: Transport not open". To verify that the exception is thrown during the send part of a RPC call, the RPC client interface has been augmented to take a bool* argument which is set to true after the send part of the RPC has completed but before the recv part starts. If DoRPC() catches an exception and the send part isn't done yet, the entire RPC if the exception string matches certain substrings which are safe to retry. The fault injection utility has also been updated to distinguish between time out and lost connection to exercise different error handling paths in the send and recv paths. Change-Id: I8243d4cac93c453e9396b0e24f41e147c8637b8c Reviewed-on: http://gerrit.cloudera.org:8080/7229 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/23565166 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/23565166 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/23565166 Branch: refs/heads/master Commit: 23565166886d7e7b33b477e391c3e6b658a80b32 Parents: 53287df Author: Michael Ho <[email protected]> Authored: Mon Jun 19 19:56:59 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Jun 21 10:04:15 2017 +0000 ---------------------------------------------------------------------- be/src/catalog/catalog-service-client-wrapper.h | 103 +++++++++++++++++++ be/src/exec/catalog-op-executor.cc | 17 +-- be/src/rpc/thrift-server-test.cc | 55 +++++----- be/src/rpc/thrift-util.cc | 22 ++-- be/src/runtime/backend-client.h | 52 +++++++++- be/src/runtime/client-cache-types.h | 6 +- be/src/runtime/client-cache.h | 14 ++- be/src/service/client-request-state.cc | 4 +- .../statestore-service-client-wrapper.h | 55 ++++++++++ .../statestore-subscriber-client-wrapper.h | 64 ++++++++++++ be/src/statestore/statestore-subscriber.cc | 9 +- be/src/statestore/statestore-subscriber.h | 3 +- be/src/statestore/statestore.cc | 17 +-- be/src/statestore/statestore.h | 7 +- be/src/testutil/fault-injection-util.cc | 36 ++++--- be/src/testutil/fault-injection-util.h | 12 ++- tests/custom_cluster/test_rpc_exception.py | 41 ++++++-- 17 files changed, 425 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/catalog/catalog-service-client-wrapper.h ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog-service-client-wrapper.h b/be/src/catalog/catalog-service-client-wrapper.h new file mode 100644 index 0000000..22ac56d --- /dev/null +++ b/be/src/catalog/catalog-service-client-wrapper.h @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef CATALOG_CATALOG_SERVICE_CLIENT_WRAPPER_H +#define CATALOG_CATALOG_SERVICE_CLIENT_WRAPPER_H + +#include "gen-cpp/CatalogService.h" + +namespace impala { + +class CatalogServiceClientWrapper : public CatalogServiceClient { + public: + CatalogServiceClientWrapper( + boost::shared_ptr<::apache::thrift::protocol::TProtocol> prot) + : CatalogServiceClient(prot) { + } + + CatalogServiceClientWrapper( + boost::shared_ptr<::apache::thrift::protocol::TProtocol> iprot, + boost::shared_ptr<::apache::thrift::protocol::TProtocol> oprot) + : CatalogServiceClient(iprot, oprot) { + } + +/// We intentionally disable this clang warning as we intend to hide the +/// the same-named functions defined in the base class. +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Woverloaded-virtual" + + void ExecDdl(TDdlExecResponse& _return, const TDdlExecRequest& req, bool* send_done) { + DCHECK(!*send_done); + send_ExecDdl(req); + *send_done = true; + recv_ExecDdl(_return); + } + + void GetCatalogObject(TGetCatalogObjectResponse& _return, + const TGetCatalogObjectRequest& req, bool* send_done) { + DCHECK(!*send_done); + send_GetCatalogObject(req); + *send_done = true; + recv_GetCatalogObject(_return); + } + + void ResetMetadata(TResetMetadataResponse& _return, const TResetMetadataRequest& req, + bool* send_done) { + DCHECK(!*send_done); + send_ResetMetadata(req); + *send_done = true; + recv_ResetMetadata(_return); + } + + void UpdateCatalog(TUpdateCatalogResponse& _return, const TUpdateCatalogRequest& req, + bool* send_done) { + DCHECK(!*send_done); + send_UpdateCatalog(req); + *send_done = true; + recv_UpdateCatalog(_return); + } + + void GetFunctions(TGetFunctionsResponse& _return, const TGetFunctionsRequest& req, + bool* send_done) { + DCHECK(!*send_done); + send_GetFunctions(req); + *send_done = true; + recv_GetFunctions(_return); + } + + void PrioritizeLoad(TPrioritizeLoadResponse& _return, const TPrioritizeLoadRequest& req, + bool* send_done) { + DCHECK(!*send_done); + send_PrioritizeLoad(req); + *send_done = true; + recv_PrioritizeLoad(_return); + } + + void SentryAdminCheck(TSentryAdminCheckResponse& _return, + const TSentryAdminCheckRequest& req, bool* send_done) { + DCHECK(!*send_done); + send_SentryAdminCheck(req); + *send_done = true; + recv_SentryAdminCheck(_return); + } + +#pragma clang diagnostic pop +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/exec/catalog-op-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc index 11c21cc..252d345 100644 --- a/be/src/exec/catalog-op-executor.cc +++ b/be/src/exec/catalog-op-executor.cc @@ -19,8 +19,9 @@ #include <sstream> -#include "exec/incr-stats-util.h" #include "common/status.h" +#include "catalog/catalog-service-client-wrapper.h" +#include "exec/incr-stats-util.h" #include "runtime/lib-cache.h" #include "runtime/client-cache-types.h" #include "runtime/exec-env.h" @@ -59,8 +60,8 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) { DCHECK(request.ddl_params.ddl_type != TDdlType::COMPUTE_STATS); exec_response_.reset(new TDdlExecResponse()); - RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::ExecDdl, request.ddl_params, - exec_response_.get())); + RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClientWrapper::ExecDdl, + request.ddl_params, exec_response_.get())); catalog_update_result_.reset( new TCatalogUpdateResult(exec_response_.get()->result)); Status status(exec_response_->result.status); @@ -75,7 +76,7 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) { } case TCatalogOpType::RESET_METADATA: { TResetMetadataResponse response; - RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::ResetMetadata, + RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClientWrapper::ResetMetadata, request.reset_metadata_params, &response)); catalog_update_result_.reset(new TCatalogUpdateResult(response.result)); return Status(response.result.status); @@ -264,7 +265,7 @@ Status CatalogOpExecutor::GetCatalogObject(const TCatalogObject& object_desc, TGetCatalogObjectResponse response; RETURN_IF_ERROR( - client.DoRpc(&CatalogServiceClient::GetCatalogObject, request, &response)); + client.DoRpc(&CatalogServiceClientWrapper::GetCatalogObject, request, &response)); *result = response.catalog_object; return Status::OK(); } @@ -276,7 +277,8 @@ Status CatalogOpExecutor::PrioritizeLoad(const TPrioritizeLoadRequest& req, Status status; CatalogServiceConnection client(env_->catalogd_client_cache(), address, &status); RETURN_IF_ERROR(status); - RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::PrioritizeLoad, req, result)); + RETURN_IF_ERROR( + client.DoRpc(&CatalogServiceClientWrapper::PrioritizeLoad, req, result)); return Status::OK(); } @@ -287,6 +289,7 @@ Status CatalogOpExecutor::SentryAdminCheck(const TSentryAdminCheckRequest& req) CatalogServiceConnection client(env_->catalogd_client_cache(), address, &cnxn_status); RETURN_IF_ERROR(cnxn_status); TSentryAdminCheckResponse resp; - RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClient::SentryAdminCheck, req, &resp)); + RETURN_IF_ERROR( + client.DoRpc(&CatalogServiceClientWrapper::SentryAdminCheck, req, &resp)); return Status(resp.status); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/rpc/thrift-server-test.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc index 21e4e81..f6edfb2 100644 --- a/be/src/rpc/thrift-server-test.cc +++ b/be/src/rpc/thrift-server-test.cc @@ -71,12 +71,12 @@ int GetServerPort() { TEST(ThriftServer, Connectivity) { int port = GetServerPort(); - ThriftClient<StatestoreServiceClient> wrong_port_client("localhost", + ThriftClient<StatestoreServiceClientWrapper> wrong_port_client("localhost", port, "", NULL, false); ASSERT_FALSE(wrong_port_client.Open().ok()); - ThriftServer* server = new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, - NULL, 5); + ThriftServer* server = + new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, NULL, 5); ASSERT_OK(server->Start()); // Test that client recovers from failure to connect. @@ -95,30 +95,33 @@ TEST(SslTest, Connectivity) { ASSERT_OK(server->Start()); FLAGS_ssl_client_ca_certificate = SERVER_CERT; - ThriftClient<StatestoreServiceClient> ssl_client( + ThriftClient<StatestoreServiceClientWrapper> ssl_client( "localhost", port, "", NULL, true); ASSERT_OK(ssl_client.Open()); TRegisterSubscriberResponse resp; - EXPECT_NO_THROW({ - ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest()); + bool send_done = false; + EXPECT_NO_THROW({ssl_client.iface()->RegisterSubscriber(resp, + TRegisterSubscriberRequest(), &send_done); }); // Disable SSL for this client. - ThriftClient<StatestoreServiceClient> non_ssl_client( + ThriftClient<StatestoreServiceClientWrapper> non_ssl_client( "localhost", port, "", NULL, false); ASSERT_OK(non_ssl_client.Open()); + send_done = false; EXPECT_THROW(non_ssl_client.iface()->RegisterSubscriber( - resp, TRegisterSubscriberRequest()), TTransportException); + resp, TRegisterSubscriberRequest(), &send_done), TTransportException); } TEST(SslTest, BadCertificate) { FLAGS_ssl_client_ca_certificate = "unknown"; int port = GetServerPort(); - ThriftClient<StatestoreServiceClient> ssl_client("localhost", port, "", NULL, true); + ThriftClient<StatestoreServiceClientWrapper> + ssl_client("localhost", port, "", NULL, true); ASSERT_FALSE(ssl_client.Open().ok()); - ThriftServer* server = new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, - NULL, 5); + ThriftServer* server = + new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, NULL, 5); ASSERT_OK(server->EnableSsl(SERVER_CERT, PRIVATE_KEY, "echo password")); ASSERT_OK(server->Start()); @@ -136,12 +139,13 @@ TEST(PasswordProtectedPemFile, CorrectOperation) { SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, "echo password")); ASSERT_OK(server->Start()); FLAGS_ssl_client_ca_certificate = SERVER_CERT; - ThriftClient<StatestoreServiceClient> ssl_client("localhost", port, "", NULL, true); + ThriftClient<StatestoreServiceClientWrapper> + ssl_client("localhost", port, "", NULL, true); ASSERT_OK(ssl_client.Open()); TRegisterSubscriberResponse resp; - EXPECT_NO_THROW({ - ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest()); - }); + bool send_done = false; + EXPECT_NO_THROW({ssl_client.iface()->RegisterSubscriber(resp, + TRegisterSubscriberRequest(), &send_done);}); } TEST(PasswordProtectedPemFile, BadPassword) { @@ -163,15 +167,17 @@ TEST(SslTest, ClientBeforeServer) { // Instantiate a thrift client before a thrift server and test if it works (IMPALA-2747) FLAGS_ssl_client_ca_certificate = SERVER_CERT; int port = GetServerPort(); - ThriftClient<StatestoreServiceClient> ssl_client("localhost", port, "", NULL, true); - ThriftServer* server = new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, - NULL, 5); + ThriftClient<StatestoreServiceClientWrapper> + ssl_client("localhost", port, "", NULL, true); + ThriftServer* server = + new ThriftServer("DummyStatestore", MakeProcessor(), port, NULL, NULL, 5); ASSERT_OK(server->EnableSsl(SERVER_CERT, PRIVATE_KEY)); ASSERT_OK(server->Start()); ASSERT_OK(ssl_client.Open()); + bool send_done = false; TRegisterSubscriberResponse resp; - ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest()); + ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest(), &send_done); } /// Test disabled because requires a high ulimit -n on build machines. Since the test does @@ -203,18 +209,19 @@ TEST(NoPasswordPemFile, BadServerCertificate) { EXPECT_OK(server->EnableSsl(BAD_SERVER_CERT, BAD_PRIVATE_KEY)); EXPECT_OK(server->Start()); FLAGS_ssl_client_ca_certificate = SERVER_CERT; - ThriftClient<StatestoreServiceClient> ssl_client( + ThriftClient<StatestoreServiceClientWrapper> ssl_client( "localhost", FLAGS_state_store_port + 5, "", NULL, true); EXPECT_OK(ssl_client.Open()); TRegisterSubscriberResponse resp; - EXPECT_THROW({ - ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest()); + bool send_done = false; + EXPECT_THROW({ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest(), + &send_done); }, TSSLException); // Close and reopen the socket ssl_client.Close(); EXPECT_OK(ssl_client.Open()); - EXPECT_THROW({ - ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest()); + EXPECT_THROW({ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest(), + &send_done); }, TSSLException); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/rpc/thrift-util.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc index 7ab5cd8..4e3f1b6 100644 --- a/be/src/rpc/thrift-util.cc +++ b/be/src/rpc/thrift-util.cc @@ -188,22 +188,28 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress& bool IsRecvTimeoutTException(const TTransportException& e) { // String taken from TSocket::read() Thrift's TSocket.cpp. - return e.getType() == TTransportException::TIMED_OUT && - strstr(e.what(), "EAGAIN (timed out)") != nullptr; + return (e.getType() == TTransportException::TIMED_OUT && + strstr(e.what(), "EAGAIN (timed out)") != nullptr) || + (e.getType() == TTransportException::INTERNAL_ERROR && + strstr(e.what(), "SSL_read: Resource temporarily unavailable") != nullptr); } // This function implements some heuristics to match against exception details -// thrown by write_partial() in thrift library. This is not very robust as it's -// possible that the receiver of the RPC call may have received all the RPC payload -// but the ACK to the sender may have been dropped somehow. In which case, it's -// not safe to retry the RPC if it's not idempotent. -// TODO: end-to-end tracking of RPC calls to detect duplicated calls in the receiver side +// thrown by functions in TSocket.cpp and TSSLSocket.cpp in thrift library. It's +// expected the caller (e.g. DoRpc()) has already verified the send part of the RPC +// didn't complete. It's only safe to retry an RPC if the send part didn't complete. +// It's also expected that the RPC client will close the existing connection and reopen +// a new connection before retrying the RPC. If the exception occurs after the send part +// is done, only the recv part of the RPC can be retried. bool IsSendFailTException(const TTransportException& e) { // String taken from TSocket::write_partial() in Thrift's TSocket.cpp return (e.getType() == TTransportException::TIMED_OUT && strstr(e.what(), "send timeout expired") != nullptr) || (e.getType() == TTransportException::NOT_OPEN && - (strstr(e.what(), "write() send()") != nullptr || + // "TTransportException: Transport not open" can be from TSSLSocket.cpp + // when the underlying socket was closed. + (strstr(e.what(), "TTransportException: Transport not open") || + strstr(e.what(), "write() send()") != nullptr || strstr(e.what(), "Called write on non-open socket") != nullptr || strstr(e.what(), "Socket send returned 0.") != nullptr)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/runtime/backend-client.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h index fe9c9bc..dd7c61e 100644 --- a/be/src/runtime/backend-client.h +++ b/be/src/runtime/backend-client.h @@ -40,7 +40,38 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient { : ImpalaInternalServiceClient(iprot, oprot), transmit_csw_(NULL) { } - void TransmitData(TTransmitDataResult& _return, const TTransmitDataParams& params) { +/// We intentionally disable this clang warning as we intend to hide the +/// the same-named functions defined in the base class. +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Woverloaded-virtual" + + void ExecQueryFInstances(TExecQueryFInstancesResult& _return, + const TExecQueryFInstancesParams& params, bool* send_done) { + DCHECK(!*send_done); + ImpalaInternalServiceClient::send_ExecQueryFInstances(params); + *send_done = true; + ImpalaInternalServiceClient::recv_ExecQueryFInstances(_return); + } + + void ReportExecStatus(TReportExecStatusResult& _return, + const TReportExecStatusParams& params, bool* send_done) { + DCHECK(!*send_done); + ImpalaInternalServiceClient::send_ReportExecStatus(params); + *send_done = true; + ImpalaInternalServiceClient::recv_ReportExecStatus(_return); + } + + void CancelQueryFInstances(TCancelQueryFInstancesResult& _return, + const TCancelQueryFInstancesParams& params, bool* send_done) { + DCHECK(!*send_done); + ImpalaInternalServiceClient::send_CancelQueryFInstances(params); + *send_done = true; + ImpalaInternalServiceClient::recv_CancelQueryFInstances(_return); + } + + void TransmitData(TTransmitDataResult& _return, const TTransmitDataParams& params, + bool* send_done) { + DCHECK(!*send_done); FAULT_INJECTION_RPC_EXCEPTION(RPC_TRANSMITDATA, true /* is_send */); if (transmit_csw_ != NULL) { SCOPED_CONCURRENT_COUNTER(transmit_csw_); @@ -48,6 +79,7 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient { } else { ImpalaInternalServiceClient::send_TransmitData(params); } + *send_done = true; FAULT_INJECTION_RPC_EXCEPTION(RPC_TRANSMITDATA, false /* is_send */); ImpalaInternalServiceClient::recv_TransmitData(_return); } @@ -65,6 +97,24 @@ class ImpalaBackendClient : public ImpalaInternalServiceClient { transmit_csw_ = NULL; } + void UpdateFilter(TUpdateFilterResult& _return, const TUpdateFilterParams& params, + bool* send_done) { + DCHECK(!*send_done); + ImpalaInternalServiceClient::send_UpdateFilter(params); + *send_done = true; + ImpalaInternalServiceClient::recv_UpdateFilter(_return); + } + + void PublishFilter(TPublishFilterResult& _return, const TPublishFilterParams& params, + bool* send_done) { + DCHECK(!*send_done); + ImpalaInternalServiceClient::send_PublishFilter(params); + *send_done = true; + ImpalaInternalServiceClient::recv_PublishFilter(_return); + } + +#pragma clang diagnostic pop + private: RuntimeProfile::ConcurrentTimerCounter* transmit_csw_; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/runtime/client-cache-types.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/client-cache-types.h b/be/src/runtime/client-cache-types.h index 6a18c25..df9a0e7 100644 --- a/be/src/runtime/client-cache-types.h +++ b/be/src/runtime/client-cache-types.h @@ -33,9 +33,9 @@ class ImpalaInternalServiceClient; typedef ClientCache<ImpalaInternalServiceClient> ImpalaInternalServiceClientCache; typedef ClientConnection<ImpalaInternalServiceClient> ImpalaInternalServiceConnection; -class CatalogServiceClient; -typedef ClientCache<CatalogServiceClient> CatalogServiceClientCache; -typedef ClientConnection<CatalogServiceClient> CatalogServiceConnection; +class CatalogServiceClientWrapper; +typedef ClientCache<CatalogServiceClientWrapper> CatalogServiceClientCache; +typedef ClientConnection<CatalogServiceClientWrapper> CatalogServiceConnection; class ImpalaBackendClient; typedef ClientCache<ImpalaBackendClient> ImpalaBackendClientCache; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/runtime/client-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h index c9b557d..09f1446 100644 --- a/be/src/runtime/client-cache.h +++ b/be/src/runtime/client-cache.h @@ -26,6 +26,7 @@ #include <boost/bind.hpp> #include <gutil/strings/substitute.h> +#include "catalog/catalog-service-client-wrapper.h" #include "runtime/client-cache-types.h" #include "util/metrics.h" #include "rpc/thrift-client.h" @@ -237,20 +238,23 @@ class ClientConnection { /// closed cnxn). /// Application-level failures should be signalled through the response type. /// + /// TODO: Consider replacing 'retry_is_safe' with a bool which callers pass to + /// indicate intention to retry recv part of the RPC if it times out. template <class F, class Request, class Response> Status DoRpc(const F& f, const Request& request, Response* response, bool* retry_is_safe = NULL) { DCHECK(response != NULL); client_is_unrecoverable_ = true; if (retry_is_safe != nullptr) *retry_is_safe = false; + bool send_done = false; try { - (client_->*f)(*response, request); + (client_->*f)(*response, request, &send_done); } catch (const apache::thrift::transport::TTransportException& e) { - if (IsRecvTimeoutTException(e)) { + if (send_done && IsRecvTimeoutTException(e)) { return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute( "Client $0 timed-out during recv call.", TNetworkAddressToString(address_))); } - if (IsSendFailTException(e)) { + if (!send_done && IsSendFailTException(e)) { return RetryRpc(f, request, response, retry_is_safe); } return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e)); @@ -315,13 +319,15 @@ class ClientConnection { return Status(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, status.GetDetail()); } try { - (client_->*f)(*response, request); + bool send_done = false; + (client_->*f)(*response, request, &send_done); } catch (const apache::thrift::TException& e) { // By this point the RPC really has failed. // TODO: Revisit this logic later. It's possible that the new connection // works but we hit timeout here. return Status(TErrorCode::RPC_GENERAL_ERROR, ExceptionMsg(e)); } + client_is_unrecoverable_ = false; return Status::OK(); } }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 4fe7431..c454ebe 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -931,8 +931,8 @@ Status ClientRequestState::UpdateCatalog() { VLOG_QUERY << "Executing FinalizeDml() using CatalogService"; TUpdateCatalogResponse resp; - RETURN_IF_ERROR( - client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, &resp)); + RETURN_IF_ERROR(client.DoRpc(&CatalogServiceClientWrapper::UpdateCatalog, + catalog_update, &resp)); Status status(resp.result.status); if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore-service-client-wrapper.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-service-client-wrapper.h b/be/src/statestore/statestore-service-client-wrapper.h new file mode 100644 index 0000000..79068a2 --- /dev/null +++ b/be/src/statestore/statestore-service-client-wrapper.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef STATESTORE_STATESTORE_SERVICE_CLIENT_WRAPPER_H +#define STATESTORE_STATESTORE_SERVICE_CLIENT_WRAPPER_H + +#include "gen-cpp/StatestoreService.h" + +namespace impala { + +class StatestoreServiceClientWrapper : public StatestoreServiceClient { + public: + StatestoreServiceClientWrapper( + boost::shared_ptr<::apache::thrift::protocol::TProtocol> prot) + : StatestoreServiceClient(prot) { + } + + StatestoreServiceClientWrapper( + boost::shared_ptr<::apache::thrift::protocol::TProtocol> iprot, + boost::shared_ptr<::apache::thrift::protocol::TProtocol> oprot) + : StatestoreServiceClient(iprot, oprot) { + } + +/// We intentionally disable this clang warning as we intend to hide the +/// the same-named functions defined in the base class. +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Woverloaded-virtual" + + void RegisterSubscriber(TRegisterSubscriberResponse& _return, + const TRegisterSubscriberRequest& params, bool* send_done) { + DCHECK(!*send_done); + send_RegisterSubscriber(params); + *send_done = true; + recv_RegisterSubscriber(_return); + } + +#pragma clang diagnostic pop +}; + +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore-subscriber-client-wrapper.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-subscriber-client-wrapper.h b/be/src/statestore/statestore-subscriber-client-wrapper.h new file mode 100644 index 0000000..2d40031 --- /dev/null +++ b/be/src/statestore/statestore-subscriber-client-wrapper.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef STATESTORE_STATESTORE_SUBSCRIBER_CLIENT_WRAPPER_H +#define STATESTORE_STATESTORE_SUBSCRIBER_CLIENT_WRAPPER_H + +#include "gen-cpp/StatestoreSubscriber.h" + +namespace impala { + +class StatestoreSubscriberClientWrapper : public StatestoreSubscriberClient { + public: + StatestoreSubscriberClientWrapper( + boost::shared_ptr<::apache::thrift::protocol::TProtocol> prot) + : StatestoreSubscriberClient(prot) { + } + + StatestoreSubscriberClientWrapper( + boost::shared_ptr<::apache::thrift::protocol::TProtocol> iprot, + boost::shared_ptr<::apache::thrift::protocol::TProtocol> oprot) + : StatestoreSubscriberClient(iprot, oprot) { + } + +/// We intentionally disable this clang warning as we intend to hide the +/// the same-named functions defined in the base class. +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Woverloaded-virtual" + + void Heartbeat(THeartbeatResponse& _return, const THeartbeatRequest& params, + bool* send_done) { + DCHECK(!*send_done); + send_Heartbeat(params); + *send_done = true; + recv_Heartbeat(_return); + } + + void UpdateState(TUpdateStateResponse& _return, const TUpdateStateRequest& params, + bool* send_done) { + DCHECK(!*send_done); + send_UpdateState(params); + *send_done = true; + recv_UpdateState(_return); + } + +#pragma clang diagnostic pop +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore-subscriber.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc index 08c11ac..12efdcd 100644 --- a/be/src/statestore/statestore-subscriber.cc +++ b/be/src/statestore/statestore-subscriber.cc @@ -31,6 +31,7 @@ #include "gen-cpp/StatestoreService_types.h" #include "rpc/rpc-trace.h" #include "rpc/thrift-util.h" +#include "statestore/statestore-service-client-wrapper.h" #include "util/time.h" #include "util/debug-util.h" @@ -64,7 +65,7 @@ const string CALLBACK_METRIC_PATTERN = "statestore-subscriber.topic-$0.processin // statestore after a failure. const int32_t SLEEP_INTERVAL_MS = 5000; -typedef ClientConnection<StatestoreServiceClient> StatestoreConnection; +typedef ClientConnection<StatestoreServiceClientWrapper> StatestoreServiceConn; // Proxy class for the subscriber heartbeat thrift API, which // translates RPCs into method calls on the local subscriber object. @@ -142,7 +143,7 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id, Status StatestoreSubscriber::Register() { Status client_status; - StatestoreConnection client(client_cache_.get(), statestore_address_, &client_status); + StatestoreServiceConn client(client_cache_.get(), statestore_address_, &client_status); RETURN_IF_ERROR(client_status); TRegisterSubscriberRequest request; @@ -157,8 +158,8 @@ Status StatestoreSubscriber::Register() { request.subscriber_location = heartbeat_address_; request.subscriber_id = subscriber_id_; TRegisterSubscriberResponse response; - RETURN_IF_ERROR( - client.DoRpc(&StatestoreServiceClient::RegisterSubscriber, request, &response)); + RETURN_IF_ERROR(client.DoRpc(&StatestoreServiceClientWrapper::RegisterSubscriber, + request, &response)); Status status = Status(response.status); if (status.ok()) connected_to_statestore_metric_->set_value(true); if (response.__isset.registration_id) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore-subscriber.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h index 1a24a76..65dcac9 100644 --- a/be/src/statestore/statestore-subscriber.h +++ b/be/src/statestore/statestore-subscriber.h @@ -29,6 +29,7 @@ #include "util/stopwatch.h" #include "rpc/thrift-util.h" #include "rpc/thrift-client.h" +#include "statestore/statestore-service-client-wrapper.h" #include "util/metrics.h" #include "gen-cpp/StatestoreService.h" #include "gen-cpp/StatestoreSubscriber.h" @@ -41,7 +42,7 @@ class Thread; class ThriftServer; class TNetworkAddress; -typedef ClientCache<StatestoreServiceClient> StatestoreClientCache; +typedef ClientCache<StatestoreServiceClientWrapper> StatestoreClientCache; /// A StatestoreSubscriber communicates with a statestore periodically through the exchange /// of topic update messages. These messages contain updates from the statestore to a list http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore.cc ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index a19de2e..bd6361f 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -24,8 +24,9 @@ #include "common/status.h" #include "gen-cpp/StatestoreService_types.h" -#include "statestore/failure-detector.h" #include "rpc/thrift-util.h" +#include "statestore/failure-detector.h" +#include "statestore/statestore-subscriber-client-wrapper.h" #include "util/debug-util.h" #include "util/logging-support.h" #include "util/time.h" @@ -100,7 +101,7 @@ const int32_t STATESTORE_MAX_SUBSCRIBERS = 10000; // Updates or heartbeats that miss their deadline by this much are logged. const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000; -typedef ClientConnection<StatestoreSubscriberClient> StatestoreSubscriberConnection; +typedef ClientConnection<StatestoreSubscriberClientWrapper> StatestoreSubscriberConn; class StatestoreThriftIf : public StatestoreServiceIf { public: @@ -224,11 +225,11 @@ Statestore::Statestore(MetricGroup* metrics) FLAGS_statestore_num_heartbeat_threads, STATESTORE_MAX_SUBSCRIBERS, bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, _2)), - update_state_client_cache_(new ClientCache<StatestoreSubscriberClient>(1, 0, + update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0, FLAGS_statestore_update_tcp_timeout_seconds * 1000, FLAGS_statestore_update_tcp_timeout_seconds * 1000, "", EnableInternalSslConnections())), - heartbeat_client_cache_(new ClientCache<StatestoreSubscriberClient>(1, 0, + heartbeat_client_cache_(new StatestoreSubscriberClientCache(1, 0, FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, "", EnableInternalSslConnections())), @@ -422,13 +423,13 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) // Second: try and send it Status status; - StatestoreSubscriberConnection client(update_state_client_cache_.get(), + StatestoreSubscriberConn client(update_state_client_cache_.get(), subscriber->network_address(), &status); RETURN_IF_ERROR(status); TUpdateStateResponse response; RETURN_IF_ERROR(client.DoRpc( - &StatestoreSubscriberClient::UpdateState, update_state_request, &response)); + &StatestoreSubscriberClientWrapper::UpdateState, update_state_request, &response)); status = Status(response.status); if (!status.ok()) { @@ -602,7 +603,7 @@ Status Statestore::SendHeartbeat(Subscriber* subscriber) { sw.Start(); Status status; - StatestoreSubscriberConnection client(heartbeat_client_cache_.get(), + StatestoreSubscriberConn client(heartbeat_client_cache_.get(), subscriber->network_address(), &status); RETURN_IF_ERROR(status); @@ -610,7 +611,7 @@ Status Statestore::SendHeartbeat(Subscriber* subscriber) { THeartbeatResponse response; request.__set_registration_id(subscriber->registration_id()); RETURN_IF_ERROR( - client.DoRpc(&StatestoreSubscriberClient::Heartbeat, request, &response)); + client.DoRpc(&StatestoreSubscriberClientWrapper::Heartbeat, request, &response)); heartbeat_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0)); return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/statestore/statestore.h ---------------------------------------------------------------------- diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h index 91adb8d..44d9792 100644 --- a/be/src/statestore/statestore.h +++ b/be/src/statestore/statestore.h @@ -35,6 +35,7 @@ #include "runtime/client-cache.h" #include "runtime/timestamp-value.h" #include "statestore/failure-detector.h" +#include "statestore/statestore-subscriber-client-wrapper.h" #include "util/aligned-new.h" #include "util/collection-metrics.h" #include "util/metrics.h" @@ -45,6 +46,8 @@ namespace impala { class Status; +typedef ClientCache<StatestoreSubscriberClientWrapper> StatestoreSubscriberClientCache; + /// The Statestore is a soft-state key-value store that maintains a set of Topics, which /// are maps from string keys to byte array values. // @@ -400,13 +403,13 @@ class Statestore : public CacheLineAligned { /// Cache of subscriber clients used for UpdateState() RPCs. Only one client per /// subscriber should be used, but the cache helps with the client lifecycle on failure. - boost::scoped_ptr<ClientCache<StatestoreSubscriberClient>> update_state_client_cache_; + boost::scoped_ptr<StatestoreSubscriberClientCache> update_state_client_cache_; /// Cache of subscriber clients used for Heartbeat() RPCs. Separate from /// update_state_client_cache_ because we enable TCP-level timeouts for these calls, /// whereas they are not safe for UpdateState() RPCs which can take an unbounded amount /// of time. - boost::scoped_ptr<ClientCache<StatestoreSubscriberClient>> heartbeat_client_cache_; + boost::scoped_ptr<StatestoreSubscriberClientCache> heartbeat_client_cache_; /// Thrift API implementation which proxies requests onto this Statestore boost::shared_ptr<StatestoreServiceIf> thrift_iface_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/testutil/fault-injection-util.cc ---------------------------------------------------------------------- diff --git a/be/src/testutil/fault-injection-util.cc b/be/src/testutil/fault-injection-util.cc index cf418da..3a77d08 100644 --- a/be/src/testutil/fault-injection-util.cc +++ b/be/src/testutil/fault-injection-util.cc @@ -62,29 +62,35 @@ void FaultInjectionUtil::InjectRpcException(RpcCallType my_type, bool is_send) { DCHECK_EQ(target_rpc_type, RPC_TRANSMITDATA); if (is_send) { - int32_t count = send_count.Add(1); - if (count % 1024 == 0) { - if (xcp_type == RPC_EXCEPTION_LOST_CONNECTION_SEND) { - // Test both exception types which are considered recoverable - // by IsSendFailTException(). - if ((count / 1024) % 2 == 0) { + if (send_count.Add(1) % 1024 == 0) { + switch (xcp_type) { + case RPC_EXCEPTION_SEND_LOST_CONNECTION: throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket"); - } else { + case RPC_EXCEPTION_SEND_TIMEDOUT: throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired"); - } - } else if (xcp_type == RPC_EXCEPTION_SSL_ERROR_SEND) { - throw TSSLException("SSL_write: SSL resource temporarily unavailable"); + case RPC_EXCEPTION_SSL_SEND_LOST_CONNECTION: + throw TTransportException(TTransportException::NOT_OPEN); + case RPC_EXCEPTION_SSL_SEND_TIMEDOUT: + throw TSSLException("SSL_write: Resource temporarily unavailable"); + // fall through for the default case. } } } else { if (recv_count.Add(1) % 1024 == 0) { - if (xcp_type == RPC_EXCEPTION_LOST_CONNECTION_RECV) { - throw TTransportException(TTransportException::NOT_OPEN, - "Called read on non-open socket"); - } else if (xcp_type == RPC_EXCEPTION_SSL_ERROR_RECV) { - throw TSSLException("SSL_read: SSL resource temporarily unavailable"); + switch (xcp_type) { + case RPC_EXCEPTION_RECV_LOST_CONNECTION: + throw TTransportException(TTransportException::NOT_OPEN, + "Called read on non-open socket"); + case RPC_EXCEPTION_RECV_TIMEDOUT: + throw TTransportException(TTransportException::TIMED_OUT, + "EAGAIN (timed out)"); + case RPC_EXCEPTION_SSL_RECV_LOST_CONNECTION: + throw TTransportException(TTransportException::NOT_OPEN); + case RPC_EXCEPTION_SSL_RECV_TIMEDOUT: + throw TSSLException("SSL_read: Resource temporarily unavailable"); + // fall through for the default case. } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/be/src/testutil/fault-injection-util.h ---------------------------------------------------------------------- diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h index 765c246..70a2375 100644 --- a/be/src/testutil/fault-injection-util.h +++ b/be/src/testutil/fault-injection-util.h @@ -39,10 +39,14 @@ class FaultInjectionUtil { enum RpcExceptionType { RPC_EXCEPTION_NONE = 0, - RPC_EXCEPTION_LOST_CONNECTION_SEND, - RPC_EXCEPTION_LOST_CONNECTION_RECV, - RPC_EXCEPTION_SSL_ERROR_SEND, - RPC_EXCEPTION_SSL_ERROR_RECV + RPC_EXCEPTION_SEND_LOST_CONNECTION, + RPC_EXCEPTION_SEND_TIMEDOUT, + RPC_EXCEPTION_RECV_LOST_CONNECTION, + RPC_EXCEPTION_RECV_TIMEDOUT, + RPC_EXCEPTION_SSL_SEND_LOST_CONNECTION, + RPC_EXCEPTION_SSL_SEND_TIMEDOUT, + RPC_EXCEPTION_SSL_RECV_LOST_CONNECTION, + RPC_EXCEPTION_SSL_RECV_TIMEDOUT, }; /// Test util function that injects delays to specified RPC server handling function http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/23565166/tests/custom_cluster/test_rpc_exception.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_rpc_exception.py b/tests/custom_cluster/test_rpc_exception.py index 6faea87..f7813e2 100644 --- a/tests/custom_cluster/test_rpc_exception.py +++ b/tests/custom_cluster/test_rpc_exception.py @@ -43,7 +43,7 @@ class TestRPCException(CustomClusterTestSuite): # Execute TEST_QUERY. If 'exception_string' is None, it's expected to complete # sucessfully with result matching EXPECTED_RESULT. Otherwise, it's expected # to fail with 'exception_string'. - def execute_query(self, exception_string): + def execute_test_query(self, exception_string): try: result = self.client.execute(self.TEST_QUERY) assert result.data == self.EXPECTED_RESULT @@ -54,24 +54,47 @@ class TestRPCException(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=1" " --fault_injection_rpc_type=5") - def test_transmitdata_send_fail(self, vector): - self.execute_query(None) + def test_transmitdata_send_lost_connection(self, vector): + self.execute_test_query(None) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=2" " --fault_injection_rpc_type=5") - def test_transmitdata_recv_fail(self, vector): - self.execute_query("Called read on non-open socket") + def test_transmitdata_send_timed_out(self, vector): + self.execute_test_query(None) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3" " --fault_injection_rpc_type=5") - def test_transmitdata_secure_send_error(self, vector): - self.execute_query("SSL_write: SSL resource temporarily unavailable") + def test_transmitdata_recv_lost_connection(self, vector): + self.execute_test_query("Called read on non-open socket") @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=4" " --fault_injection_rpc_type=5") - def test_transmitdata_secure_recv_error(self, vector): - self.execute_query("SSL_read: SSL resource temporarily unavailable") + def test_transmitdata_recv_timed_out(self, vector): + self.execute_test_query(None) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5" + " --fault_injection_rpc_type=5") + def test_transmitdata_secure_send_lost_connection(self, vector): + self.execute_test_query(None); + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6" + " --fault_injection_rpc_type=5") + def test_transmitdata_secure_send_timed_out(self, vector): + self.execute_test_query("SSL_write: Resource temporarily unavailable") + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7" + " --fault_injection_rpc_type=5") + def test_transmitdata_secure_recv_lost_connection(self, vector): + self.execute_test_query("TTransportException: Transport not open") + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8" + " --fault_injection_rpc_type=5") + def test_transmitdata_secure_recv_timed_out(self, vector): + self.execute_test_query(None)
