This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 088f4d71a8c HIVE-26633: Make Thrift MaxMessageSize configurable (John
Sherman, reviewed by Aman Sinha, Zhihua Deng)
088f4d71a8c is described below
commit 088f4d71a8c45b6f44d9f4be470f6eb7807a9ed8
Author: John Sherman <[email protected]>
AuthorDate: Wed Oct 19 19:56:31 2022 -0700
HIVE-26633: Make Thrift MaxMessageSize configurable (John Sherman, reviewed
by Aman Sinha, Zhihua Deng)
Closes #3674
---
.../hadoop/hive/common/auth/HiveAuthUtils.java | 101 +++++++++++++++++----
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../minikdc/TestRemoteHiveMetaStoreKerberos.java | 50 ++++++++++
.../apache/hive/minikdc/TestSSLWithMiniKdc.java | 32 +++++++
.../java/org/apache/hive/jdbc/miniHS2/MiniHS2.java | 7 +-
.../java/org/apache/hive/jdbc/HiveConnection.java | 29 +++++-
jdbc/src/java/org/apache/hive/jdbc/Utils.java | 1 +
.../cli/thrift/RetryingThriftCLIServiceClient.java | 4 +-
.../hadoop/hive/metastore/HiveMetaStoreClient.java | 19 +++-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 4 +
.../hive/metastore/security/TFilterTransport.java | 24 ++---
.../hadoop/hive/metastore/TestHiveMetaStore.java | 9 +-
12 files changed, 237 insertions(+), 48 deletions(-)
diff --git
a/common/src/java/org/apache/hadoop/hive/common/auth/HiveAuthUtils.java
b/common/src/java/org/apache/hadoop/hive/common/auth/HiveAuthUtils.java
index 16b6bf77b91..a37d527c1ea 100644
--- a/common/src/java/org/apache/hadoop/hive/common/auth/HiveAuthUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/auth/HiveAuthUtils.java
@@ -33,9 +33,8 @@ import javax.net.ssl.TrustManagerFactory;
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TSSLTransportFactory;
+import
org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
@@ -50,45 +49,107 @@ import org.slf4j.LoggerFactory;
public class HiveAuthUtils {
private static final Logger LOG =
LoggerFactory.getLogger(HiveAuthUtils.class);
+ /**
+ * Configure the provided T transport's max message size.
+ * @param transport Transport to configure maxMessage for
+ * @param maxMessageSize Maximum allowed message size in bytes, less than or
equal to 0 means use the Thrift library
+ * default.
+ * @return The passed in T transport configured with desired max message
size. The same object passed in is returned.
+ */
+ public static <T extends TTransport> T configureThriftMaxMessageSize(T
transport, int maxMessageSize) {
+ if (maxMessageSize > 0) {
+ if (transport.getConfiguration() == null) {
+ LOG.warn("TTransport {} is returning a null Configuration, Thrift max
message size is not getting configured",
+ transport.getClass().getName());
+ return transport;
+ }
+ transport.getConfiguration().setMaxMessageSize(maxMessageSize);
+ }
+ return transport;
+ }
+
+ /**
+ * Create a TSocket for the provided host and port with specified
loginTimeout. Thrift maxMessageSize
+ * will default to Thrift library default.
+ * @param host Host to connect to.
+ * @param port Port to connect to.
+ * @param loginTimeout Socket timeout (0 means no timeout).
+ * @return TTransport TSocket for host/port.
+ */
public static TTransport getSocketTransport(String host, int port, int
loginTimeout) throws TTransportException {
- return new TSocket(new TConfiguration(),host, port, loginTimeout);
+ return getSocketTransport(host, port, loginTimeout, /* maxMessageSize */
-1);
+ }
+
+ /**
+ * Create a TSocket for the provided host and port with specified
loginTimeout and maxMessageSize.
+ * will default to Thrift library default.
+ * @param host Host to connect to.
+ * @param port Port to connect to.
+ * @param loginTimeout Socket timeout (0 means no timeout).
+ * @param maxMessageSize Size in bytes for max allowable Thrift message
size, less than or equal to 0
+ * results in using the Thrift library default.
+ * @return TTransport TSocket for host/port
+ */
+ public static TTransport getSocketTransport(String host, int port, int
loginTimeout, int maxMessageSize)
+ throws TTransportException {
+ TSocket tSocket = new TSocket(host, port, loginTimeout);
+ return configureThriftMaxMessageSize(tSocket, maxMessageSize);
+ }
+
+ public static TTransport getSSLSocket(String host, int port, int
loginTimeout, TSSLTransportParameters params,
+ int maxMessageSize) throws TTransportException {
+ // The underlying SSLSocket object is bound to host:port with the given
SO_TIMEOUT and
+ // SSLContext created with the given params
+ TSocket tSSLSocket = null;
+ if (params != null) {
+ tSSLSocket = TSSLTransportFactory.getClientSocket(host, port,
loginTimeout, params);
+ } else {
+ tSSLSocket = TSSLTransportFactory.getClientSocket(host, port,
loginTimeout);
+ }
+ return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
+ }
+
+ public static TTransport getSSLSocket(String host, int port, int
loginTimeout, String trustStorePath,
+ String trustStorePassWord, String trustStoreType, String
trustStoreAlgorithm) throws TTransportException {
+ return getSSLSocket(host, port, loginTimeout, trustStorePath,
trustStorePassWord, trustStoreType,
+ trustStoreAlgorithm, /* maxMessageSize */ -1);
}
- public static TTransport getSSLSocket(String host, int port, int
loginTimeout)
- throws TTransportException {
- // The underlying SSLSocket object is bound to host:port with the given
SO_TIMEOUT
- TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port,
loginTimeout);
- return getSSLSocketWithHttps(tSSLSocket);
+ public static TTransport getSSLSocket(String host, int port, int
loginTimeout) throws TTransportException {
+ return getSSLSocket(host, port, loginTimeout, /* maxMessageSize */ -1);
}
- public static TTransport getSSLSocket(String host, int port, int
loginTimeout,
- String trustStorePath, String trustStorePassWord, String trustStoreType,
- String trustStoreAlgorithm) throws TTransportException {
- TSSLTransportFactory.TSSLTransportParameters params =
- new TSSLTransportFactory.TSSLTransportParameters();
+ public static TTransport getSSLSocket(String host, int port, int
loginTimeout, int maxMessageSize)
+ throws TTransportException {
+ return getSSLSocket(host, port, loginTimeout, /* params */ null,
maxMessageSize);
+ }
+
+ public static TTransport getSSLSocket(String host, int port, int
loginTimeout, String trustStorePath,
+ String trustStorePassWord, String trustStoreType, String
trustStoreAlgorithm, int maxMessageSize)
+ throws TTransportException {
+ TSSLTransportParameters params = new TSSLTransportParameters();
String tStoreType = trustStoreType.isEmpty()? KeyStore.getDefaultType() :
trustStoreType;
String tStoreAlgorithm = trustStoreAlgorithm.isEmpty()?
TrustManagerFactory.getDefaultAlgorithm() : trustStoreAlgorithm;
params.setTrustStore(trustStorePath, trustStorePassWord, tStoreAlgorithm,
tStoreType);
params.requireClientAuth(true);
- // The underlying SSLSocket object is bound to host:port with the given
SO_TIMEOUT and
- // SSLContext created with the given params
- TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port,
loginTimeout, params);
- return getSSLSocketWithHttps(tSSLSocket);
+ return getSSLSocket(host, port, loginTimeout, params, maxMessageSize);
}
// Using endpoint identification algorithm as HTTPS enables us to do
// CNAMEs/subjectAltName verification
- private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws
TTransportException {
+ private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int
maxMessageSize)
+ throws TTransportException {
SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket();
SSLParameters sslParams = sslSocket.getSSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslSocket.setSSLParameters(sslParams);
- return new TSocket(sslSocket);
+ TSocket tSocket = new TSocket(sslSocket);
+ return configureThriftMaxMessageSize(tSocket, maxMessageSize);
}
public static TServerSocket getServerSocket(String hiveHost, int portNum)
- throws TTransportException {
+ throws TTransportException {
InetSocketAddress serverAddress;
if (hiveHost == null || hiveHost.isEmpty()) {
// Wildcard bind
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f0b1a7c1bfd..1e06e0a956b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2906,7 +2906,10 @@ public class HiveConf extends Configuration {
HIVE_STATS_MAX_NUM_STATS("hive.stats.max.num.stats", (long) 10000,
"When the number of stats to be updated is huge, this value is used to
control the number of \n" +
" stats to be sent to HMS for update."),
-
+ HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE("hive.thrift.client.max.message.size",
"1gb",
+ new SizeValidator(-1L, true, (long) Integer.MAX_VALUE, true),
+ "Thrift client configuration for max message size. 0 or -1 will use
the default defined in the Thrift " +
+ "library. The upper limit is 2147483648 bytes (or 2gb)."),
// Concurrency
HIVE_SUPPORT_CONCURRENCY("hive.support.concurrency", false,
"Whether Hive supports concurrency control or not. \n" +
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java
index 50be9d76984..c24cb5b62bb 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java
@@ -18,11 +18,24 @@
package org.apache.hive.minikdc;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TestRemoteHiveMetaStore;
+import org.apache.hadoop.hive.metastore.TestHiveMetaStore;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.thrift.transport.TTransportException;
import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
public class TestRemoteHiveMetaStoreKerberos extends TestRemoteHiveMetaStore {
private static MiniHiveKdc miniKDC;
@@ -45,6 +58,43 @@ public class TestRemoteHiveMetaStoreKerberos extends
TestRemoteHiveMetaStore {
super.setUp();
}
+ @Test
+ public void testThriftMaxMessageSize() throws Throwable {
+ String dbName = "compdb";
+ String tblName = "comptbl";
+ String typeName = "Person";
+
+ cleanUp(dbName, tblName, typeName);
+ List<List<String>> values = new ArrayList<>();
+ values.add(makeVals("2008-07-01 14:13:12", "14"));
+ values.add(makeVals("2008-07-01 14:13:12", "15"));
+ values.add(makeVals("2008-07-02 14:13:12", "15"));
+ values.add(makeVals("2008-07-03 14:13:12", "151"));
+
+ createMultiPartitionTableSchema(dbName, tblName, typeName, values);
+
+ Configuration clientConf = MetastoreConf.newMetastoreConf(new
Configuration(conf));
+ MetastoreConf.setVar(clientConf, ConfVars.THRIFT_URIS,
"thrift://localhost:" + port);
+ // set to a low value to prove THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE is
being honored
+ // (it should throw an exception)
+ MetastoreConf.setVar(clientConf,
ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "1024");
+ HiveMetaStoreClient limitedClient = new HiveMetaStoreClient(clientConf);
+ Exception expectedException = assertThrows(TTransportException.class, ()
-> {
+ limitedClient.listPartitions(dbName, tblName, (short)-1);
+ });
+ String exceptionMessage = expectedException.getMessage();
+ // Verify the Thrift library is enforcing the limit
+ assertTrue(exceptionMessage.contains("MaxMessageSize reached"));
+ limitedClient.close();
+
+ // test default client (with a default
THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE)
+ List<Partition> partitions = client.listPartitions(dbName, tblName,
(short) -1);
+ assertNotNull(partitions);
+ assertEquals("expected to receive the same number of partitions added",
values.size(), partitions.size());
+
+ cleanUp(dbName, tblName, typeName);
+ }
+
@Override
protected HiveMetaStoreClient createClient() throws Exception {
MetastoreConf.setVar(conf, ConfVars.THRIFT_URIS, "thrift://localhost:" +
port);
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
index cea9503697a..7ca74efb648 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
@@ -24,16 +24,24 @@ import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.ColumnType;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.apache.thrift.transport.TTransportException;
import org.hadoop.hive.jdbc.SSLTestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
public class TestSSLWithMiniKdc {
@@ -84,6 +92,30 @@ public class TestSSLWithMiniKdc {
stmt.close();
}
+ @Test
+ public void testHmsThriftMaxMessageSize() throws Exception {
+ Configuration clientConf = MetastoreConf.newMetastoreConf(new
Configuration(miniHS2.getHiveConf()));
+ MetastoreConf.setVar(clientConf, MetastoreConf.ConfVars.THRIFT_URIS,
"thrift://localhost:" + miniHS2.getHmsPort());
+ // set to a low value to prove THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE is
being honored
+ // (it should throw an exception)
+ MetastoreConf.setVar(clientConf,
MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "512");
+ HiveMetaStoreClient limitedClient = new HiveMetaStoreClient(clientConf);
+ String dbName = "default";
+ String tableName = "testThriftMaxMessageSize";
+ TableBuilder tblBuilder = new
TableBuilder().setDbName(dbName).setTableName(tableName);
+ for (int i = 0; i <= 10; i++) {
+ tblBuilder.addCol("abcdefghijklmnopqrstuvwxyz" + i,
ColumnType.STRING_TYPE_NAME);
+ }
+ tblBuilder.create(limitedClient, clientConf);
+ Exception expectedException = assertThrows(TTransportException.class, ()
-> {
+ limitedClient.getTable(dbName, tableName);
+ });
+ String exceptionMessage = expectedException.getMessage();
+ // Verify the Thrift library is enforcing the limit
+ assertTrue(exceptionMessage.contains("MaxMessageSize reached"));
+ limitedClient.close();
+ }
+
private Connection getConnection(String userName) throws Exception {
miniHiveKdc.loginUser(userName);
return DriverManager.getConnection(miniHS2.getJdbcURL("default",
SSLTestUtils.SSL_CONN_PARAMS),
diff --git
a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index e85a6bcd9f7..9e95d3b2db9 100644
--- a/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/util/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -87,6 +87,7 @@ public class MiniHS2 extends AbstractHiveService {
private boolean usePortsFromConf = false;
private PamAuthenticator pamAuthenticator;
private boolean createTransactionalTables;
+ private int hmsPort = 0;
public enum MiniClusterType {
MR,
@@ -372,7 +373,7 @@ public class MiniHS2 extends AbstractHiveService {
public void start(Map<String, String> confOverlay) throws Exception {
if (isMetastoreRemote) {
-
MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
getHiveConf(),
+ hmsPort =
MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(),
getHiveConf(),
false, false, false, false, createTransactionalTables);
setWareHouseDir(MetastoreConf.getVar(getHiveConf(),
MetastoreConf.ConfVars.WAREHOUSE));
}
@@ -728,4 +729,8 @@ public class MiniHS2 extends AbstractHiveService {
// Ignore. Safe if it does not exist.
}
}
+
+ public int getHmsPort() {
+ return hmsPort;
+ }
}
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index abc543843a5..4a6fb7c423d 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -534,6 +534,7 @@ public class HiveConnection implements java.sql.Connection {
validateSslForBrowserMode();
httpClient = getHttpClient(useSsl);
transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
+ HiveAuthUtils.configureThriftMaxMessageSize(transport,
getMaxMessageSize());
return transport;
}
@@ -849,8 +850,10 @@ public class HiveConnection implements java.sql.Connection
{
*
* @return TTransport
* @throws TTransportException
+ * @throws SQLException
*/
- private TTransport createUnderlyingTransport() throws TTransportException {
+ private TTransport createUnderlyingTransport() throws TTransportException,
SQLException {
+ int maxMessageSize = getMaxMessageSize();
TTransport transport = null;
// Note: Thrift returns an SSL socket that is already bound to the
specified host:port
// Therefore an open called on this would be a no-op later
@@ -864,7 +867,7 @@ public class HiveConnection implements java.sql.Connection {
JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
if (sslTrustStore == null || sslTrustStore.isEmpty()) {
- transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout);
+ transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout,
maxMessageSize);
} else {
String trustStoreType =
sessConfMap.get(JdbcConnectionParams.SSL_TRUST_STORE_TYPE);
@@ -876,16 +879,32 @@ public class HiveConnection implements
java.sql.Connection {
if (trustStoreAlgorithm == null) {
trustStoreAlgorithm = "";
}
- transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout,
- sslTrustStore, sslTrustStorePassword, trustStoreType,
trustStoreAlgorithm);
+ transport = HiveAuthUtils.getSSLSocket(host, port, loginTimeout,
sslTrustStore, sslTrustStorePassword,
+ trustStoreType, trustStoreAlgorithm, maxMessageSize);
}
} else {
// get non-SSL socket transport
- transport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout);
+ transport = HiveAuthUtils.getSocketTransport(host, port, loginTimeout,
maxMessageSize);
}
return transport;
}
+ private int getMaxMessageSize() throws SQLException {
+ String maxMessageSize =
sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE);
+ if (maxMessageSize == null) {
+ return -1;
+ }
+
+ try {
+ return Integer.parseInt(maxMessageSize);
+ } catch (Exception e) {
+ String errFormat = "Invalid {} configuration of '{}'. Expected an
integer specifying number of bytes. " +
+ "A configuration of <= 0 uses default max message size.";
+ String errMsg = String.format(errFormat,
JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE, maxMessageSize);
+ throw new SQLException(errMsg, "42000", e);
+ }
+ }
+
/**
* Create transport per the connection options
* Supported transport options are:
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 764ae11b08b..c1be6a52df4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -165,6 +165,7 @@ public class Utils {
// Create external purge table by default
static final String CREATE_TABLE_AS_EXTERNAL =
"hiveCreateAsExternalLegacy";
public static final String SOCKET_TIMEOUT = "socketTimeout";
+ static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE =
"thrift.client.max.message.size";
// We support ways to specify application name modeled after some existing
DBs, since
// there's no standard approach.
diff --git
a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
index 53641f31c84..9079c652d0a 100644
---
a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
+++
b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import javax.security.sasl.SaslException;
+import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.PlainSaslHelper;
@@ -309,9 +310,10 @@ public class RetryingThriftCLIServiceClient implements
InvocationHandler {
String host = conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
int port = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT);
+ int maxThriftMessageSize = (int)
conf.getSizeVar(HiveConf.ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE);
LOG.info("Connecting to " + host + ":" + port);
- transport = new TSocket(host, port);
+ transport = HiveAuthUtils.getSocketTransport(host, port, 0,
maxThriftMessageSize);
((TSocket) transport).setTimeout((int)
conf.getTimeVar(HiveConf.ConfVars.SERVER_READ_SOCKET_TIMEOUT,
TimeUnit.SECONDS) * 1000);
try {
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 2ed37e7c111..53c7c6ba7a2 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -608,6 +608,19 @@ public class HiveMetaStoreClient implements
IMetaStoreClient, AutoCloseable {
client.rename_partition_req(req);
}
+ private <T extends TTransport> T configureThriftMaxMessageSize(T transport) {
+ int maxThriftMessageSize = (int) MetastoreConf.getSizeVar(conf,
ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE);
+ if (maxThriftMessageSize > 0) {
+ if (transport.getConfiguration() == null) {
+ LOG.warn("TTransport {} is returning a null Configuration, Thrift max
message size is not getting configured",
+ transport.getClass().getName());
+ return transport;
+ }
+ transport.getConfiguration().setMaxMessageSize(maxThriftMessageSize);
+ }
+ return transport;
+ }
+
/*
Creates a THttpClient if HTTP mode is enabled. If Client auth mode is set to
JWT,
then the method fetches JWT from environment variable: HMS_JWT and sets in
auth
@@ -681,7 +694,7 @@ public class HiveMetaStoreClient implements
IMetaStoreClient, AutoCloseable {
}
}
LOG.debug("Created thrift http client for URL: " + httpUrl);
- return tHttpClient;
+ return configureThriftMaxMessageSize(tHttpClient);
}
private TTransport createBinaryClient(URI store, boolean useSSL) throws
TTransportException,
@@ -705,7 +718,7 @@ public class HiveMetaStoreClient implements
IMetaStoreClient, AutoCloseable {
binaryTransport = SecurityUtils.getSSLSocket(store.getHost(),
store.getPort(), clientSocketTimeout,
trustStorePath, trustStorePassword, trustStoreType,
trustStoreAlgorithm);
} else {
- binaryTransport = new TSocket(new TConfiguration(),store.getHost(),
store.getPort(),
+ binaryTransport = new TSocket(new TConfiguration(), store.getHost(),
store.getPort(),
clientSocketTimeout);
}
binaryTransport = createAuthBinaryTransport(store, binaryTransport);
@@ -718,7 +731,7 @@ public class HiveMetaStoreClient implements
IMetaStoreClient, AutoCloseable {
}
}
LOG.debug("Created thrift binary client for URI: " + store);
- return binaryTransport;
+ return configureThriftMaxMessageSize(binaryTransport);
}
private void open() throws MetaException {
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index d3c08a540cf..460e41b418e 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1399,6 +1399,10 @@ public class MetastoreConf {
"If dynamic service discovery mode is set, the URIs are used
to connect to the" +
" corresponding service discovery servers e.g. a zookeeper.
Otherwise they are " +
"used as URIs for remote metastore."),
+
THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE("metastore.thrift.client.max.message.size",
+ "hive.thrift.client.max.message.size", "1gb", new SizeValidator(-1L,
true, (long) Integer.MAX_VALUE, true),
+ "Thrift client configuration for max message size. 0 or -1 will use
the default defined in the Thrift " +
+ "library. The upper limit is 2147483648 bytes (or 2gb)."),
THRIFT_SERVICE_DISCOVERY_MODE("metastore.service.discovery.mode",
"hive.metastore.service.discovery.mode",
"",
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
index 5ff672c5444..e9670e38ded 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/security/TFilterTransport.java
@@ -98,18 +98,18 @@ import org.apache.thrift.transport.TTransportException;
wrapped.consumeBuffer(len);
}
- @Override
- public TConfiguration getConfiguration() {
- return null;
- }
-
- @Override
- public void updateKnownMessageSize(long l) throws TTransportException {
-
- }
+ @Override
+ public TConfiguration getConfiguration() {
+ return wrapped.getConfiguration();
+ }
- @Override
- public void checkReadBytesAvailable(long l) throws TTransportException {
+ @Override
+ public void updateKnownMessageSize(long l) throws TTransportException {
+ wrapped.updateKnownMessageSize(l);
+ }
- }
+ @Override
+ public void checkReadBytesAvailable(long l) throws TTransportException {
+ wrapped.checkReadBytesAvailable(l);
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
index 67fc7063662..5217e8d8506 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
@@ -495,7 +495,7 @@ public abstract class TestHiveMetaStore {
assertTrue("Not all parts returned",
mpartial.containsAll(expectedPartitions));
}
- private static List<String> makeVals(String ds, String id) {
+ public static List<String> makeVals(String ds, String id) {
List <String> vals4 = new ArrayList<>(2);
vals4.add(ds);
vals4.add(id);
@@ -552,7 +552,6 @@ public abstract class TestHiveMetaStore {
" partitions",values.size(), partitions.size());
cleanUp(dbName, tblName, typeName);
-
}
@Test
@@ -3005,7 +3004,7 @@ public abstract class TestHiveMetaStore {
stmt.executeUpdate();
}
- private void cleanUp(String dbName, String tableName, String typeName)
throws Exception {
+ protected void cleanUp(String dbName, String tableName, String typeName)
throws Exception {
if(dbName != null && tableName != null) {
client.dropTable(dbName, tableName);
}
@@ -3017,7 +3016,7 @@ public abstract class TestHiveMetaStore {
}
}
- private Database createDb(String dbName) throws Exception {
+ protected Database createDb(String dbName) throws Exception {
if(null == dbName) { return null; }
return new DatabaseBuilder()
.setName(dbName)
@@ -3101,7 +3100,7 @@ public abstract class TestHiveMetaStore {
return partitions;
}
- private List<Partition> createMultiPartitionTableSchema(String dbName,
String tblName,
+ protected List<Partition> createMultiPartitionTableSchema(String dbName,
String tblName,
String typeName, List<List<String>> values) throws Throwable {
return createMultiPartitionTableSchema(null, dbName, tblName, typeName,
values);
}