This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new b1a985be5 IMPALA-13680: Avoid flush() when closing TSSLSocket b1a985be5 is described below commit b1a985be5eb49db6f23912a1439eeb59d74a278e Author: Csaba Ringhofer <csringho...@cloudera.com> AuthorDate: Tue Jan 21 10:36:12 2025 +0100 IMPALA-13680: Avoid flush() when closing TSSLSocket Closing the transports could hang in TAcceptQueueServer if there was an error during SSL handshake. As the TSSLSocket is wrapped in TBufferedTransport and TBufferedTransport::close() calls flush(), TSSLSocket::flush() was also called that led to trying again the handshake in an unclean state. This led to hanging indefinitely with OpenSSL 3.2. Another potential error is that if flush throws an exception then the underlying TTransport's close() wont' be called. Ideally this would be solved in Thrift (THRIFT-5846). As quick fix this change adds a subclass for TBufferedTransport that doesn't call flush(). This is safe to do as generated TProcessor subclasses call flush() every time when the client/server sends a message. Testing: - the issue was caught by thrift-server-test/KerberosOnAndOff and TestClientSsl::test_ssl hanging till killed Change-Id: I4879a1567f7691711d73287269bf87f2946e75d2 Reviewed-on: http://gerrit.cloudera.org:8080/22368 Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Reviewed-by: Zoltan Borok-Nagy <borokna...@cloudera.com> --- be/src/rpc/auth-provider.h | 3 ++- be/src/rpc/authentication.cc | 8 ++++---- be/src/rpc/thrift-client.h | 2 +- be/src/rpc/thrift-server.h | 20 +++++++++++++++++++- be/src/transport/TSaslServerTransport.cpp | 4 ++-- 5 files changed, 28 insertions(+), 9 deletions(-) diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h index 1f2ab776a..9f1a1420a 100644 --- a/be/src/rpc/auth-provider.h +++ b/be/src/rpc/auth-provider.h @@ -42,7 +42,8 @@ class AuthProvider { /// Creates a new Thrift transport factory in the out parameter that performs /// authorisation per this provider's protocol. The type of the transport returned is /// determined by 'underlying_transport_type' and there may be multiple levels of - /// wrapped transports, eg. a TBufferedTransport around a TSaslServerTransport. + /// wrapped transports, eg. a ThriftServer::BufferedTransport around a + /// TSaslServerTransport. virtual Status GetServerTransportFactory( ThriftServer::TransportType underlying_transport_type, const std::string& server_name, MetricGroup* metrics, diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc index 020a41356..8b022b7fd 100644 --- a/be/src/rpc/authentication.cc +++ b/be/src/rpc/authentication.cc @@ -1493,8 +1493,8 @@ void SecureAuthProvider::SetupConnectionContext( TSocket* socket = nullptr; switch (underlying_transport_type) { case ThriftServer::BINARY: { - TBufferedTransport* buffered_transport = - down_cast<TBufferedTransport*>(input_transport); + ThriftServer::BufferedTransport* buffered_transport = + down_cast<ThriftServer::BufferedTransport*>(input_transport); TSaslServerTransport* sasl_transport = down_cast<TSaslServerTransport*>( buffered_transport->getUnderlyingTransport().get()); socket = down_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get()); @@ -1605,8 +1605,8 @@ void NoAuthProvider::SetupConnectionContext( TSocket* socket = nullptr; switch (underlying_transport_type) { case ThriftServer::BINARY: { - TBufferedTransport* buffered_transport = - down_cast<TBufferedTransport*>(input_transport); + ThriftServer::BufferedTransport* buffered_transport = + down_cast<ThriftServer::BufferedTransport*>(input_transport); socket = down_cast<TSocket*>(buffered_transport->getUnderlyingTransport().get()); break; } diff --git a/be/src/rpc/thrift-client.h b/be/src/rpc/thrift-client.h index a88fe300a..7cb0dbe14 100644 --- a/be/src/rpc/thrift-client.h +++ b/be/src/rpc/thrift-client.h @@ -154,7 +154,7 @@ ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port } // transport_ is created by wrapping the socket_ in the TTransport provided by the - // auth_provider_ and then a TBufferedTransport (IMPALA-1928). + // auth_provider_ and then a ThriftServer::BufferedTransport (IMPALA-1928). transport_ = socket_; init_status_ = auth_provider_->WrapClientTransport(address_.hostname, transport_, service_name, &transport_); diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h index 75488f78b..820526ca1 100644 --- a/be/src/rpc/thrift-server.h +++ b/be/src/rpc/thrift-server.h @@ -59,6 +59,24 @@ class AuthProvider; /// TODO: shutdown is buggy (which only harms tests) class ThriftServer { public: + + /// Override TBufferedTransport::close() to avoid calling flush() which is not safe + /// in TSSLSocket if it is in error state. See IMPALA-13680 / THRIFT-5846 for details. + class BufferedTransport : + public apache::thrift::transport::TBufferedTransport { + + public: + BufferedTransport(std::shared_ptr<apache::thrift::transport::TTransport>& transport, + uint32_t sz, std::shared_ptr<apache::thrift::TConfiguration>&& config) + : apache::thrift::transport::TBufferedTransport(transport, sz, config) {} + + // base implementation: + // https://github.com/apache/thrift/blob/d078721e44fea7713832ae5d0f5d9ca67317f19e/lib/cpp/src/thrift/transport/TBufferTransports.h#L367 + virtual void close() override { + transport_->close(); + } + }; + /// Transport factory that wraps transports in a buffered transport with a customisable /// buffer-size and optionally in another transport from a provided factory. A larger /// buffer is usually more efficient, as it allows the underlying transports to perform @@ -81,7 +99,7 @@ class ThriftServer { VerifyMaxMessageSizeInheritance(trans.get(), wrapped.get()); std::shared_ptr<apache::thrift::transport::TTransport> buffered_wrapped = std::shared_ptr<apache::thrift::transport::TTransport>( - new apache::thrift::transport::TBufferedTransport( + new BufferedTransport( wrapped, buffer_size_, wrapped->getConfiguration())); VerifyMaxMessageSizeInheritance(wrapped.get(), buffered_wrapped.get()); return buffered_wrapped; diff --git a/be/src/transport/TSaslServerTransport.cpp b/be/src/transport/TSaslServerTransport.cpp index 3018ab85b..082929440 100644 --- a/be/src/transport/TSaslServerTransport.cpp +++ b/be/src/transport/TSaslServerTransport.cpp @@ -116,7 +116,7 @@ std::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport( // to be the same so that the authentication state is identical for communication in // both directions. In order to do this, we share the same TTransport object for both // input and output set in TAcceptQueueServer::SetupConnection. - std::shared_ptr<TBufferedTransport> ret_transport; + std::shared_ptr<impala::ThriftServer::BufferedTransport> ret_transport; std::shared_ptr<TTransport> wrapped( new TSaslServerTransport(serverDefinitionMap_, trans)); // Verify the max message size is inherited properly @@ -126,7 +126,7 @@ std::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport( TSocket* socket = static_cast<TSocket*>(trans.get()); socket->setRecvTimeout(FLAGS_sasl_connect_tcp_timeout_ms); socket->setSendTimeout(FLAGS_sasl_connect_tcp_timeout_ms); - ret_transport.reset(new TBufferedTransport(wrapped, + ret_transport.reset(new impala::ThriftServer::BufferedTransport(wrapped, impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES, wrapped->getConfiguration())); impala::VerifyMaxMessageSizeInheritance(wrapped.get(), ret_transport.get());