This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 11de72f117 [KYUUBI #6594] Port HIVE-26633: Make thrift client
maxMessageSize configurable
11de72f117 is described below
commit 11de72f1179dc3f217d55c89644f7120d8dbc7ec
Author: Cheng Pan <[email protected]>
AuthorDate: Tue Aug 27 11:00:53 2024 +0800
[KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize
configurable
# :mag: Description
Fix #6594.
This PR ports HIVE-26633(https://github.com/apache/hive/pull/3674): Make
thrift client maxMessageSize configurable to fix a regression after upgrading
Thrift 0.16 in 1.9.0.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6631 from pan3793/thrift-max-size.
Closes #6594
e4841c88e [Cheng Pan] [KYUUBI #6594] Port HIVE-26633: Make thrift client
maxMessageSize configurable
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/configuration/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 7 +++
.../kyuubi/jdbc/hive/JdbcConnectionParams.java | 1 +
.../apache/kyuubi/jdbc/hive/KyuubiConnection.java | 45 +++++++++++++--
.../apache/kyuubi/jdbc/hive/auth/ThriftUtils.java | 66 +++++++++++++++++++---
.../kyuubi/client/KyuubiSyncThriftClient.scala | 18 ++++--
6 files changed, 121 insertions(+), 17 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 2b9a71f10c..68014a5cf3 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -274,6 +274,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.frontend.thrift.binary.ssl.disallowed.protocols | SSLv2,SSLv3
| SSL versions to disallow for Kyuubi thrift binary frontend.
[...]
| kyuubi.frontend.thrift.binary.ssl.enabled | false
| Set this to true for using SSL encryption in thrift binary frontend
server.
[...]
| kyuubi.frontend.thrift.binary.ssl.include.ciphersuites
|| A comma-separated list of include SSL cipher suite names for thrift binary
frontend.
[...]
+| kyuubi.frontend.thrift.client.max.message.size | 1073741824
| Maximum message size in bytes a thrift client will receive.
[...]
| kyuubi.frontend.thrift.http.bind.host |
<undefined> | Hostname or IP of the machine on which to run the thrift
frontend service via http protocol.
[...]
| kyuubi.frontend.thrift.http.bind.port | 10010
| Port of the machine on which to run the thrift frontend service via http
protocol.
[...]
| kyuubi.frontend.thrift.http.compression.enabled | true
| Enable thrift http compression via Jetty compression support
[...]
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index c265f92eb8..a88b5f615e 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -665,6 +665,13 @@ object KyuubiConf {
.version("1.4.0")
.fallbackConf(FRONTEND_MAX_MESSAGE_SIZE)
+ val FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
+ buildConf("kyuubi.frontend.thrift.client.max.message.size")
+ .doc("Maximum message size in bytes a thrift client will receive.")
+ .version("1.9.3")
+ .intConf
+ .createWithDefault(1 * 1024 * 1024 * 1024) // follow HIVE-26633 to use
1g as default value
+
val FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.frontend.thrift.http.request.header.size")
.doc("Request header size in bytes, when using HTTP transport mode.
Jetty defaults used.")
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
index cd9fd517ef..b3884c694f 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
@@ -97,6 +97,7 @@ public class JdbcConnectionParams {
static final String CONNECT_TIMEOUT = "connectTimeout";
static final String SOCKET_TIMEOUT = "socketTimeout";
+ static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE =
"thrift.client.max.message.size";
// We support ways to specify application name modeled after some existing
DBs, since
// there's no standard approach.
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index 4c39fb308a..eaf71cfa1b 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -67,6 +67,7 @@ import org.apache.kyuubi.jdbc.hive.cli.RowSet;
import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.*;
+import org.apache.kyuubi.shaded.thrift.TConfiguration;
import org.apache.kyuubi.shaded.thrift.TException;
import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol;
import org.apache.kyuubi.shaded.thrift.transport.THttpClient;
@@ -419,7 +420,13 @@ public class KyuubiConnection implements SQLConnection,
KyuubiLoggable {
boolean useSsl = isSslConnection();
// Create an http client from the configs
httpClient = getHttpClient(useSsl);
- transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
+ int maxMessageSize = getMaxMessageSize();
+ TConfiguration.Builder tConfBuilder = TConfiguration.custom();
+ if (maxMessageSize > 0) {
+ tConfBuilder.setMaxMessageSize(maxMessageSize);
+ }
+ TConfiguration tConf = tConfBuilder.build();
+ transport = new THttpClient(tConf, getServerHttpUrl(useSsl), httpClient);
return transport;
}
@@ -629,7 +636,8 @@ public class KyuubiConnection implements SQLConnection,
KyuubiLoggable {
}
/** Create underlying SSL or non-SSL transport */
- private TTransport createUnderlyingTransport() throws TTransportException {
+ private TTransport createUnderlyingTransport() throws TTransportException,
SQLException {
+ int maxMessageSize = getMaxMessageSize();
TTransport transport = null;
// Note: Thrift returns an SSL socket that is already bound to the
specified host:port
// Therefore an open called on this would be a no-op later
@@ -643,19 +651,46 @@ public class KyuubiConnection implements SQLConnection,
KyuubiLoggable {
Utils.getPassword(sessConfMap,
JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
if (sslTrustStore == null || sslTrustStore.isEmpty()) {
- transport = ThriftUtils.getSSLSocket(host, port, connectTimeout,
socketTimeout);
+ transport =
+ ThriftUtils.getSSLSocket(host, port, connectTimeout,
socketTimeout, maxMessageSize);
} else {
transport =
ThriftUtils.getSSLSocket(
- host, port, connectTimeout, socketTimeout, sslTrustStore,
sslTrustStorePassword);
+ host,
+ port,
+ connectTimeout,
+ socketTimeout,
+ sslTrustStore,
+ sslTrustStorePassword,
+ maxMessageSize);
}
} else {
// get non-SSL socket transport
- transport = ThriftUtils.getSocketTransport(host, port, connectTimeout,
socketTimeout);
+ transport =
+ ThriftUtils.getSocketTransport(host, port, connectTimeout,
socketTimeout, maxMessageSize);
}
return transport;
}
+ private int getMaxMessageSize() throws SQLException {
+ String maxMessageSize =
sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE);
+ if (maxMessageSize == null) {
+ return -1;
+ }
+
+ try {
+ return Integer.parseInt(maxMessageSize);
+ } catch (Exception e) {
+ String errFormat =
+ "Invalid {} configuration of '{}'. Expected an integer specifying
number of bytes. "
+ + "A configuration of <= 0 uses default max message size.";
+ String errMsg =
+ String.format(
+ errFormat, JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE,
maxMessageSize);
+ throw new SQLException(errMsg, "42000", e);
+ }
+ }
+
/**
* Create transport per the connection options Supported transport options
are: - SASL based
* transports over + Kerberos + SSL + non-SSL - Raw (non-SASL) socket
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
index 7f0099b299..331b871e08 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
@@ -24,23 +24,70 @@ import
org.apache.kyuubi.shaded.thrift.transport.TSSLTransportFactory;
import org.apache.kyuubi.shaded.thrift.transport.TSocket;
import org.apache.kyuubi.shaded.thrift.transport.TTransport;
import org.apache.kyuubi.shaded.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class helps in some aspects of authentication. It creates the proper
Thrift classes for the
* given configuration as well as helps with authenticating requests.
*/
public class ThriftUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftUtils.class);
+
+ /**
+ * Configure the provided T transport's max message size.
+ *
+ * @param transport Transport to configure maxMessage for
+ * @param maxMessageSize Maximum allowed message size in bytes, less than or
equal to 0 means use
+ * the Thrift library default.
+ * @return The passed in T transport configured with desired max message
size. The same object
+ * passed in is returned.
+ */
+ public static <T extends TTransport> T configureThriftMaxMessageSize(
+ T transport, int maxMessageSize) {
+ if (maxMessageSize > 0) {
+ if (transport.getConfiguration() == null) {
+ LOG.warn(
+ "TTransport {} is returning a null Configuration, Thrift max
message size is not getting configured",
+ transport.getClass().getName());
+ return transport;
+ }
+ transport.getConfiguration().setMaxMessageSize(maxMessageSize);
+ }
+ return transport;
+ }
+
+ /**
+ * Create a TSocket for the provided host and port with specified
connectTimeout, loginTimeout and
+ * maxMessageSize.
+ *
+ * @param host Host to connect to.
+ * @param port Port to connect to.
+ * @param connectTimeout Socket connect timeout (0 means no timeout).
+ * @param socketTimeout Socket read/write timeout (0 means no timeout).
+ * @param maxMessageSize Size in bytes for max allowable Thrift message
size, less than or equal
+ * to 0 results in using the Thrift library default.
+ * @return TTransport TSocket for host/port
+ */
public static TTransport getSocketTransport(
- String host, int port, int connectTimeout, int socketTimeout) throws
TTransportException {
- return new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout,
connectTimeout);
+ String host, int port, int connectTimeout, int socketTimeout, int
maxMessageSize)
+ throws TTransportException {
+ TConfiguration.Builder tConfBuilder = TConfiguration.custom();
+ if (maxMessageSize > 0) {
+ tConfBuilder.setMaxMessageSize(maxMessageSize);
+ }
+ TConfiguration tConf = tConfBuilder.build();
+ return new TSocket(tConf, host, port, socketTimeout, connectTimeout);
}
public static TTransport getSSLSocket(
- String host, int port, int connectTimeout, int socketTimeout) throws
TTransportException {
+ String host, int port, int connectTimeout, int socketTimeout, int
maxMessageSize)
+ throws TTransportException {
// The underlying SSLSocket object is bound to host:port with the given
SO_TIMEOUT
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port,
socketTimeout);
tSSLSocket.setConnectTimeout(connectTimeout);
- return getSSLSocketWithHttps(tSSLSocket);
+ return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
}
public static TTransport getSSLSocket(
@@ -49,7 +96,8 @@ public class ThriftUtils {
int connectTimeout,
int socketTimeout,
String trustStorePath,
- String trustStorePassWord)
+ String trustStorePassWord,
+ int maxMessageSize)
throws TTransportException {
TSSLTransportFactory.TSSLTransportParameters params =
new TSSLTransportFactory.TSSLTransportParameters();
@@ -59,16 +107,18 @@ public class ThriftUtils {
// SSLContext created with the given params
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port,
socketTimeout, params);
tSSLSocket.setConnectTimeout(connectTimeout);
- return getSSLSocketWithHttps(tSSLSocket);
+ return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
}
// Using endpoint identification algorithm as HTTPS enables us to do
// CNAMEs/subjectAltName verification
- private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws
TTransportException {
+ private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int
maxMessageSize)
+ throws TTransportException {
SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket();
SSLParameters sslParams = sslSocket.getSSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslSocket.setSSLParameters(sslParams);
- return new TSocket(sslSocket);
+ TSocket tSocket = new TSocket(sslSocket);
+ return configureThriftMaxMessageSize(tSocket, maxMessageSize);
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index d34458c647..c36e9ec06c 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -470,8 +470,10 @@ private[kyuubi] object KyuubiSyncThriftClient extends
Logging {
host: String,
port: Int,
socketTimeout: Int,
- connectionTimeout: Int): TProtocol = {
- val tSocket = new TSocket(TConfiguration.DEFAULT, host, port,
socketTimeout, connectionTimeout)
+ connectionTimeout: Int,
+ maxMessageSize: Int): TProtocol = {
+ val tConf =
TConfiguration.custom().setMaxMessageSize(maxMessageSize).build()
+ val tSocket = new TSocket(tConf, host, port, socketTimeout,
connectionTimeout)
val tTransport = PlainSASLHelper.getPlainTransport(user, passwd, tSocket)
tTransport.open()
new TBinaryProtocol(tTransport)
@@ -485,15 +487,23 @@ private[kyuubi] object KyuubiSyncThriftClient extends
Logging {
conf: KyuubiConf): KyuubiSyncThriftClient = {
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = conf.get(ENGINE_LOGIN_TIMEOUT).toInt
+ val maxMessageSize =
conf.get(KyuubiConf.FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE)
val aliveProbeEnabled = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
val aliveProbeInterval =
conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
- val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout)
+ val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout,
maxMessageSize)
val aliveProbeProtocol =
if (aliveProbeEnabled) {
- Option(createTProtocol(user, passwd, host, port, aliveProbeInterval,
loginTimeout))
+ Some(createTProtocol(
+ user,
+ passwd,
+ host,
+ port,
+ aliveProbeInterval,
+ loginTimeout,
+ maxMessageSize))
} else {
None
}