This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new eb03be663a [KYUUBI #6594] Port HIVE-26633: Make thrift client 
maxMessageSize configurable
eb03be663a is described below

commit eb03be663af4f644cd5c2ddc2212d2ec6ed97a42
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Aug 27 11:00:53 2024 +0800

    [KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize 
configurable
    
    Fix #6594.
    
    This PR ports HIVE-26633(https://github.com/apache/hive/pull/3674): Make 
thrift client maxMessageSize configurable to fix a regression after upgrading 
Thrift 0.16 in 1.9.0.
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ---
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6631 from pan3793/thrift-max-size.
    
    Closes #6594
    
    e4841c88e [Cheng Pan] [KYUUBI #6594] Port HIVE-26633: Make thrift client 
maxMessageSize configurable
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 11de72f1179dc3f217d55c89644f7120d8dbc7ec)
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/configuration/settings.md                     |  1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  7 +++
 .../kyuubi/jdbc/hive/JdbcConnectionParams.java     |  1 +
 .../apache/kyuubi/jdbc/hive/KyuubiConnection.java  | 45 +++++++++++++--
 .../apache/kyuubi/jdbc/hive/auth/ThriftUtils.java  | 66 +++++++++++++++++++---
 .../kyuubi/client/KyuubiSyncThriftClient.scala     | 18 ++++--
 6 files changed, 121 insertions(+), 17 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 0ea5ee5f05..8d468513c5 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -259,6 +259,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.frontend.thrift.binary.ssl.disallowed.protocols | SSLv2,SSLv3        
| SSL versions to disallow for Kyuubi thrift binary frontend.                   
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | kyuubi.frontend.thrift.binary.ssl.enabled              | false              
| Set this to true for using SSL encryption in thrift binary frontend server.   
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | kyuubi.frontend.thrift.binary.ssl.include.ciphersuites                     
|| A comma-separated list of include SSL cipher suite names for thrift binary 
frontend.                                                                       
                                                                                
                                                                                
                                                                                
                  [...]
+| kyuubi.frontend.thrift.client.max.message.size         | 1073741824         
| Maximum message size in bytes a thrift client will receive.                   
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | kyuubi.frontend.thrift.http.bind.host                  | &lt;undefined&gt;  
| Hostname or IP of the machine on which to run the thrift frontend service via 
http protocol.                                                                  
                                                                                
                                                                                
                                                                                
               [...]
 | kyuubi.frontend.thrift.http.bind.port                  | 10010              
| Port of the machine on which to run the thrift frontend service via http 
protocol.                                                                       
                                                                                
                                                                                
                                                                                
                    [...]
 | kyuubi.frontend.thrift.http.compression.enabled        | true               
| Enable thrift http compression via Jetty compression support                  
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 5e23c0fe63..49cc60cbc6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -605,6 +605,13 @@ object KyuubiConf {
       .version("1.4.0")
       .fallbackConf(FRONTEND_MAX_MESSAGE_SIZE)
 
+  val FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
+    buildConf("kyuubi.frontend.thrift.client.max.message.size")
+      .doc("Maximum message size in bytes a thrift client will receive.")
+      .version("1.9.3")
+      .intConf
+      .createWithDefault(1 * 1024 * 1024 * 1024) // follow HIVE-26633 to use 
1g as default value
+
   val FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE: ConfigEntry[Int] =
     buildConf("kyuubi.frontend.thrift.http.request.header.size")
       .doc("Request header size in bytes, when using HTTP transport mode. 
Jetty defaults used.")
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
index 9aba2a813f..74eb71a006 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
@@ -94,6 +94,7 @@ public class JdbcConnectionParams {
 
   static final String CONNECT_TIMEOUT = "connectTimeout";
   static final String SOCKET_TIMEOUT = "socketTimeout";
+  static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE = 
"thrift.client.max.message.size";
 
   // We support ways to specify application name modeled after some existing 
DBs, since
   // there's no standard approach.
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index de4e5c3532..a023c3bffa 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -66,6 +66,7 @@ import org.apache.kyuubi.jdbc.hive.cli.RowSet;
 import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
 import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.*;
+import org.apache.kyuubi.shaded.thrift.TConfiguration;
 import org.apache.kyuubi.shaded.thrift.TException;
 import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol;
 import org.apache.kyuubi.shaded.thrift.transport.THttpClient;
@@ -418,7 +419,13 @@ public class KyuubiConnection implements SQLConnection, 
KyuubiLoggable {
     boolean useSsl = isSslConnection();
     // Create an http client from the configs
     httpClient = getHttpClient(useSsl);
-    transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
+    int maxMessageSize = getMaxMessageSize();
+    TConfiguration.Builder tConfBuilder = TConfiguration.custom();
+    if (maxMessageSize > 0) {
+      tConfBuilder.setMaxMessageSize(maxMessageSize);
+    }
+    TConfiguration tConf = tConfBuilder.build();
+    transport = new THttpClient(tConf, getServerHttpUrl(useSsl), httpClient);
     return transport;
   }
 
@@ -582,7 +589,8 @@ public class KyuubiConnection implements SQLConnection, 
KyuubiLoggable {
   }
 
   /** Create underlying SSL or non-SSL transport */
-  private TTransport createUnderlyingTransport() throws TTransportException {
+  private TTransport createUnderlyingTransport() throws TTransportException, 
SQLException {
+    int maxMessageSize = getMaxMessageSize();
     TTransport transport = null;
     // Note: Thrift returns an SSL socket that is already bound to the 
specified host:port
     // Therefore an open called on this would be a no-op later
@@ -596,19 +604,46 @@ public class KyuubiConnection implements SQLConnection, 
KyuubiLoggable {
           Utils.getPassword(sessConfMap, 
JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
 
       if (sslTrustStore == null || sslTrustStore.isEmpty()) {
-        transport = ThriftUtils.getSSLSocket(host, port, connectTimeout, 
socketTimeout);
+        transport =
+            ThriftUtils.getSSLSocket(host, port, connectTimeout, 
socketTimeout, maxMessageSize);
       } else {
         transport =
             ThriftUtils.getSSLSocket(
-                host, port, connectTimeout, socketTimeout, sslTrustStore, 
sslTrustStorePassword);
+                host,
+                port,
+                connectTimeout,
+                socketTimeout,
+                sslTrustStore,
+                sslTrustStorePassword,
+                maxMessageSize);
       }
     } else {
       // get non-SSL socket transport
-      transport = ThriftUtils.getSocketTransport(host, port, connectTimeout, 
socketTimeout);
+      transport =
+          ThriftUtils.getSocketTransport(host, port, connectTimeout, 
socketTimeout, maxMessageSize);
     }
     return transport;
   }
 
+  private int getMaxMessageSize() throws SQLException {
+    String maxMessageSize = 
sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE);
+    if (maxMessageSize == null) {
+      return -1;
+    }
+
+    try {
+      return Integer.parseInt(maxMessageSize);
+    } catch (Exception e) {
+      String errFormat =
+          "Invalid {} configuration of '{}'. Expected an integer specifying 
number of bytes. "
+              + "A configuration of <= 0 uses default max message size.";
+      String errMsg =
+          String.format(
+              errFormat, JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE, 
maxMessageSize);
+      throw new SQLException(errMsg, "42000", e);
+    }
+  }
+
   /**
    * Create transport per the connection options Supported transport options 
are: - SASL based
    * transports over + Kerberos + SSL + non-SSL - Raw (non-SASL) socket
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
index 7f0099b299..331b871e08 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
@@ -24,23 +24,70 @@ import 
org.apache.kyuubi.shaded.thrift.transport.TSSLTransportFactory;
 import org.apache.kyuubi.shaded.thrift.transport.TSocket;
 import org.apache.kyuubi.shaded.thrift.transport.TTransport;
 import org.apache.kyuubi.shaded.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class helps in some aspects of authentication. It creates the proper 
Thrift classes for the
  * given configuration as well as helps with authenticating requests.
  */
 public class ThriftUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ThriftUtils.class);
+
+  /**
+   * Configure the provided T transport's max message size.
+   *
+   * @param transport Transport to configure maxMessage for
+   * @param maxMessageSize Maximum allowed message size in bytes, less than or 
equal to 0 means use
+   *     the Thrift library default.
+   * @return The passed in T transport configured with desired max message 
size. The same object
+   *     passed in is returned.
+   */
+  public static <T extends TTransport> T configureThriftMaxMessageSize(
+      T transport, int maxMessageSize) {
+    if (maxMessageSize > 0) {
+      if (transport.getConfiguration() == null) {
+        LOG.warn(
+            "TTransport {} is returning a null Configuration, Thrift max 
message size is not getting configured",
+            transport.getClass().getName());
+        return transport;
+      }
+      transport.getConfiguration().setMaxMessageSize(maxMessageSize);
+    }
+    return transport;
+  }
+
+  /**
+   * Create a TSocket for the provided host and port with specified 
connectTimeout, loginTimeout and
+   * maxMessageSize.
+   *
+   * @param host Host to connect to.
+   * @param port Port to connect to.
+   * @param connectTimeout Socket connect timeout (0 means no timeout).
+   * @param socketTimeout Socket read/write timeout (0 means no timeout).
+   * @param maxMessageSize Size in bytes for max allowable Thrift message 
size, less than or equal
+   *     to 0 results in using the Thrift library default.
+   * @return TTransport TSocket for host/port
+   */
   public static TTransport getSocketTransport(
-      String host, int port, int connectTimeout, int socketTimeout) throws 
TTransportException {
-    return new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, 
connectTimeout);
+      String host, int port, int connectTimeout, int socketTimeout, int 
maxMessageSize)
+      throws TTransportException {
+    TConfiguration.Builder tConfBuilder = TConfiguration.custom();
+    if (maxMessageSize > 0) {
+      tConfBuilder.setMaxMessageSize(maxMessageSize);
+    }
+    TConfiguration tConf = tConfBuilder.build();
+    return new TSocket(tConf, host, port, socketTimeout, connectTimeout);
   }
 
   public static TTransport getSSLSocket(
-      String host, int port, int connectTimeout, int socketTimeout) throws 
TTransportException {
+      String host, int port, int connectTimeout, int socketTimeout, int 
maxMessageSize)
+      throws TTransportException {
     // The underlying SSLSocket object is bound to host:port with the given 
SO_TIMEOUT
     TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, 
socketTimeout);
     tSSLSocket.setConnectTimeout(connectTimeout);
-    return getSSLSocketWithHttps(tSSLSocket);
+    return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
   }
 
   public static TTransport getSSLSocket(
@@ -49,7 +96,8 @@ public class ThriftUtils {
       int connectTimeout,
       int socketTimeout,
       String trustStorePath,
-      String trustStorePassWord)
+      String trustStorePassWord,
+      int maxMessageSize)
       throws TTransportException {
     TSSLTransportFactory.TSSLTransportParameters params =
         new TSSLTransportFactory.TSSLTransportParameters();
@@ -59,16 +107,18 @@ public class ThriftUtils {
     // SSLContext created with the given params
     TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, 
socketTimeout, params);
     tSSLSocket.setConnectTimeout(connectTimeout);
-    return getSSLSocketWithHttps(tSSLSocket);
+    return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
   }
 
   // Using endpoint identification algorithm as HTTPS enables us to do
   // CNAMEs/subjectAltName verification
-  private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws 
TTransportException {
+  private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int 
maxMessageSize)
+      throws TTransportException {
     SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket();
     SSLParameters sslParams = sslSocket.getSSLParameters();
     sslParams.setEndpointIdentificationAlgorithm("HTTPS");
     sslSocket.setSSLParameters(sslParams);
-    return new TSocket(sslSocket);
+    TSocket tSocket = new TSocket(sslSocket);
+    return configureThriftMaxMessageSize(tSocket, maxMessageSize);
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index d34458c647..c36e9ec06c 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -470,8 +470,10 @@ private[kyuubi] object KyuubiSyncThriftClient extends 
Logging {
       host: String,
       port: Int,
       socketTimeout: Int,
-      connectionTimeout: Int): TProtocol = {
-    val tSocket = new TSocket(TConfiguration.DEFAULT, host, port, 
socketTimeout, connectionTimeout)
+      connectionTimeout: Int,
+      maxMessageSize: Int): TProtocol = {
+    val tConf = 
TConfiguration.custom().setMaxMessageSize(maxMessageSize).build()
+    val tSocket = new TSocket(tConf, host, port, socketTimeout, 
connectionTimeout)
     val tTransport = PlainSASLHelper.getPlainTransport(user, passwd, tSocket)
     tTransport.open()
     new TBinaryProtocol(tTransport)
@@ -485,15 +487,23 @@ private[kyuubi] object KyuubiSyncThriftClient extends 
Logging {
       conf: KyuubiConf): KyuubiSyncThriftClient = {
     val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
     val loginTimeout = conf.get(ENGINE_LOGIN_TIMEOUT).toInt
+    val maxMessageSize = 
conf.get(KyuubiConf.FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE)
     val aliveProbeEnabled = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
     val aliveProbeInterval = 
conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
     val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
 
-    val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout)
+    val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout, 
maxMessageSize)
 
     val aliveProbeProtocol =
       if (aliveProbeEnabled) {
-        Option(createTProtocol(user, passwd, host, port, aliveProbeInterval, 
loginTimeout))
+        Some(createTProtocol(
+          user,
+          passwd,
+          host,
+          port,
+          aliveProbeInterval,
+          loginTimeout,
+          maxMessageSize))
       } else {
         None
       }

Reply via email to