This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b1a985be5 IMPALA-13680: Avoid flush() when closing TSSLSocket
b1a985be5 is described below

commit b1a985be5eb49db6f23912a1439eeb59d74a278e
Author: Csaba Ringhofer <csringho...@cloudera.com>
AuthorDate: Tue Jan 21 10:36:12 2025 +0100

    IMPALA-13680: Avoid flush() when closing TSSLSocket
    
    Closing the transports could hang in TAcceptQueueServer if there was
    an error during SSL handshake. As the TSSLSocket is wrapped in
    TBufferedTransport and TBufferedTransport::close() calls flush(),
    TSSLSocket::flush() was also called that led to trying again the
    handshake in an unclean state. This led to hanging indefinitely with
    OpenSSL 3.2. Another potential error is that if flush throws an
    exception then the underlying TTransport's close() wont' be called.
    
    Ideally this would be solved in Thrift (THRIFT-5846). As quick
    fix this change adds a subclass for TBufferedTransport that doesn't
    call flush(). This is safe to do as generated TProcessor
    subclasses call flush() every time when the client/server sends
    a message.
    
    Testing:
    - the issue was caught by thrift-server-test/KerberosOnAndOff
      and TestClientSsl::test_ssl hanging till killed
    
    Change-Id: I4879a1567f7691711d73287269bf87f2946e75d2
    Reviewed-on: http://gerrit.cloudera.org:8080/22368
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <borokna...@cloudera.com>
---
 be/src/rpc/auth-provider.h                |  3 ++-
 be/src/rpc/authentication.cc              |  8 ++++----
 be/src/rpc/thrift-client.h                |  2 +-
 be/src/rpc/thrift-server.h                | 20 +++++++++++++++++++-
 be/src/transport/TSaslServerTransport.cpp |  4 ++--
 5 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 1f2ab776a..9f1a1420a 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -42,7 +42,8 @@ class AuthProvider {
   /// Creates a new Thrift transport factory in the out parameter that performs
   /// authorisation per this provider's protocol. The type of the transport 
returned is
   /// determined by 'underlying_transport_type' and there may be multiple 
levels of
-  /// wrapped transports, eg. a TBufferedTransport around a 
TSaslServerTransport.
+  /// wrapped transports, eg. a ThriftServer::BufferedTransport around a
+  /// TSaslServerTransport.
   virtual Status GetServerTransportFactory(
       ThriftServer::TransportType underlying_transport_type,
       const std::string& server_name, MetricGroup* metrics,
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 020a41356..8b022b7fd 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -1493,8 +1493,8 @@ void SecureAuthProvider::SetupConnectionContext(
   TSocket* socket = nullptr;
   switch (underlying_transport_type) {
     case ThriftServer::BINARY: {
-      TBufferedTransport* buffered_transport =
-          down_cast<TBufferedTransport*>(input_transport);
+      ThriftServer::BufferedTransport* buffered_transport =
+          down_cast<ThriftServer::BufferedTransport*>(input_transport);
       TSaslServerTransport* sasl_transport = down_cast<TSaslServerTransport*>(
           buffered_transport->getUnderlyingTransport().get());
       socket = 
down_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
@@ -1605,8 +1605,8 @@ void NoAuthProvider::SetupConnectionContext(
   TSocket* socket = nullptr;
   switch (underlying_transport_type) {
     case ThriftServer::BINARY: {
-      TBufferedTransport* buffered_transport =
-          down_cast<TBufferedTransport*>(input_transport);
+      ThriftServer::BufferedTransport* buffered_transport =
+          down_cast<ThriftServer::BufferedTransport*>(input_transport);
       socket = 
down_cast<TSocket*>(buffered_transport->getUnderlyingTransport().get());
       break;
     }
diff --git a/be/src/rpc/thrift-client.h b/be/src/rpc/thrift-client.h
index a88fe300a..7cb0dbe14 100644
--- a/be/src/rpc/thrift-client.h
+++ b/be/src/rpc/thrift-client.h
@@ -154,7 +154,7 @@ ThriftClient<InterfaceType>::ThriftClient(const 
std::string& ipaddress, int port
   }
 
   // transport_ is created by wrapping the socket_ in the TTransport provided 
by the
-  // auth_provider_ and then a TBufferedTransport (IMPALA-1928).
+  // auth_provider_ and then a ThriftServer::BufferedTransport (IMPALA-1928).
   transport_ = socket_;
   init_status_ = auth_provider_->WrapClientTransport(address_.hostname, 
transport_,
       service_name, &transport_);
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 75488f78b..820526ca1 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -59,6 +59,24 @@ class AuthProvider;
 /// TODO: shutdown is buggy (which only harms tests)
 class ThriftServer {
  public:
+
+   /// Override TBufferedTransport::close() to avoid calling flush() which is 
not safe
+   /// in TSSLSocket if it is in error state. See IMPALA-13680 / THRIFT-5846 
for details.
+   class BufferedTransport :
+      public apache::thrift::transport::TBufferedTransport {
+
+    public:
+      
BufferedTransport(std::shared_ptr<apache::thrift::transport::TTransport>& 
transport,
+          uint32_t sz, std::shared_ptr<apache::thrift::TConfiguration>&& 
config)
+      : apache::thrift::transport::TBufferedTransport(transport, sz, config) {}
+
+      // base implementation:
+      // 
https://github.com/apache/thrift/blob/d078721e44fea7713832ae5d0f5d9ca67317f19e/lib/cpp/src/thrift/transport/TBufferTransports.h#L367
+      virtual void close() override {
+        transport_->close();
+      }
+  };
+
   /// Transport factory that wraps transports in a buffered transport with a 
customisable
   /// buffer-size and optionally in another transport from a provided factory. 
A larger
   /// buffer is usually more efficient, as it allows the underlying transports 
to perform
@@ -81,7 +99,7 @@ class ThriftServer {
       VerifyMaxMessageSizeInheritance(trans.get(), wrapped.get());
       std::shared_ptr<apache::thrift::transport::TTransport> buffered_wrapped =
           std::shared_ptr<apache::thrift::transport::TTransport>(
-              new apache::thrift::transport::TBufferedTransport(
+              new BufferedTransport(
                   wrapped, buffer_size_, wrapped->getConfiguration()));
       VerifyMaxMessageSizeInheritance(wrapped.get(), buffered_wrapped.get());
       return buffered_wrapped;
diff --git a/be/src/transport/TSaslServerTransport.cpp 
b/be/src/transport/TSaslServerTransport.cpp
index 3018ab85b..082929440 100644
--- a/be/src/transport/TSaslServerTransport.cpp
+++ b/be/src/transport/TSaslServerTransport.cpp
@@ -116,7 +116,7 @@ std::shared_ptr<TTransport> 
TSaslServerTransport::Factory::getTransport(
   // to be the same so that the authentication state is identical for 
communication in
   // both directions. In order to do this, we share the same TTransport object 
for both
   // input and output set in TAcceptQueueServer::SetupConnection.
-  std::shared_ptr<TBufferedTransport> ret_transport;
+  std::shared_ptr<impala::ThriftServer::BufferedTransport> ret_transport;
   std::shared_ptr<TTransport> wrapped(
       new TSaslServerTransport(serverDefinitionMap_, trans));
   // Verify the max message size is inherited properly
@@ -126,7 +126,7 @@ std::shared_ptr<TTransport> 
TSaslServerTransport::Factory::getTransport(
   TSocket* socket = static_cast<TSocket*>(trans.get());
   socket->setRecvTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
   socket->setSendTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
-  ret_transport.reset(new TBufferedTransport(wrapped,
+  ret_transport.reset(new impala::ThriftServer::BufferedTransport(wrapped,
       
impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
           wrapped->getConfiguration()));
   impala::VerifyMaxMessageSizeInheritance(wrapped.get(), ret_transport.get());

Reply via email to