This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 38cc65043d6 HIVE-28824: Metastore should also respect the max thrift
message size (#5699) (Zhihua Deng, reviewed by Denys Kuzmenko)
38cc65043d6 is described below
commit 38cc65043d6fa40cba2adbe65648e52b41b1551e
Author: dengzh <[email protected]>
AuthorDate: Sun Mar 23 12:26:22 2025 +0800
HIVE-28824: Metastore should also respect the max thrift message size
(#5699) (Zhihua Deng, reviewed by Denys Kuzmenko)
---
.../apache/hive/minikdc/TestJdbcWithMiniKdc.java | 23 ++++++
.../minikdc/TestRemoteHiveMetaStoreKerberos.java | 14 +++-
.../apache/hive/minikdc/TestSSLWithMiniKdc.java | 12 +++
.../service/cli/thrift/ThriftBinaryCLIService.java | 12 ++-
.../hadoop/hive/metastore/HiveMetaStore.java | 87 ++++++----------------
.../hadoop/hive/metastore/HiveMetastoreCli.java | 79 ++++++++++++++++++++
.../hive/metastore/TServerSocketKeepAlive.java | 36 ---------
.../hive/metastore/TestHiveMetastoreCli.java | 8 +-
8 files changed, 163 insertions(+), 108 deletions(-)
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java
index 9ffd336602d..78d819761fe 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java
@@ -37,6 +37,7 @@
import org.apache.hive.service.cli.session.HiveSessionHook;
import org.apache.hive.service.cli.session.HiveSessionHookContext;
import org.apache.hive.service.cli.session.SessionUtils;
+import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -263,6 +264,28 @@ public void testNegativeProxyAuth() throws Exception {
.getConnection(miniHS2.getJdbcURL("default",
";hive.server2.proxy.user=" + MiniHiveKdc.HIVE_TEST_USER_2));
}
+ @Test
+ public void testHs2ThriftMaxMessageSize() throws Exception {
+ HiveConf.setVar(miniHS2.getHiveConf(),
HiveConf.ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE, "512");
+ assertEquals(512L,
+ HiveConf.getSizeVar(miniHS2.getHiveConf(),
HiveConf.ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE));
+ Connection conn = DriverManager.getConnection(miniHS2.getJdbcURL());
+ Statement stmt = conn.createStatement();
+ try {
+ StringBuilder createTable = new StringBuilder("create external table
tesths2thriftmaxmessagesize(");
+ for (int i = 0; i < 100; i++) {
+ createTable.append("abcdefghijklmnopqrstuvwxyz").append(i).append("
string, ");
+ }
+ createTable.append(" a int)");
+ Throwable t = assertThrows(SQLException.class, () ->
stmt.execute(createTable.toString())).getCause();
+ assertTrue(t instanceof TTransportException);
+ assertEquals(TTransportException.END_OF_FILE,
((TTransportException)t).getType());
+ assertTrue(t.getMessage().contains("Socket is closed by peer"));
+ } finally {
+
miniHS2.getHiveConf().unset(ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE.varname);
+ }
+ }
+
/**
* Verify the config property value
* @param propertyName
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java
index c24cb5b62bb..cf25ac08a4c 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestRemoteHiveMetaStoreKerberos.java
@@ -21,7 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TestRemoteHiveMetaStore;
-import org.apache.hadoop.hive.metastore.TestHiveMetaStore;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -92,6 +92,18 @@ public void testThriftMaxMessageSize() throws Throwable {
assertNotNull(partitions);
assertEquals("expected to receive the same number of partitions added",
values.size(), partitions.size());
+ // Set the max massage size on Metastore
+ MetastoreConf.setVar(conf,
ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "1024");
+ MetastoreConf.setVar(clientConf,
ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "1048576000");
+ try (HiveMetaStoreClient client1 = new HiveMetaStoreClient(clientConf)) {
+ TTransportException te = assertThrows(TTransportException.class,
+ () -> client1.alter_partitions(dbName, tblName, partitions, new
EnvironmentContext()));
+ assertEquals(TTransportException.END_OF_FILE, te.getType());
+ assertTrue(te.getMessage().contains("Socket is closed by peer"));
+ } finally {
+
conf.unset(ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE.getVarname());
+ }
+
cleanUp(dbName, tblName, typeName);
}
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
index 623340eaee5..faaf8776169 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestSSLWithMiniKdc.java
@@ -40,6 +40,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -117,6 +118,17 @@ public void testHmsThriftMaxMessageSize() throws Exception
{
// Verify the Thrift library is enforcing the limit
assertTrue(exceptionMessage.contains("MaxMessageSize reached"));
limitedClient.close();
+
+ MetastoreConf.setVar(clientConf,
MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "1048576000");
+ MetastoreConf.setVar(miniHS2.getHiveConf(),
MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE, "512");
+ tblBuilder.setTableName("testThriftMaxMessageSize1");
+ try (HiveMetaStoreClient client = new HiveMetaStoreClient(clientConf)) {
+ TTransportException te = assertThrows(TTransportException.class, () ->
tblBuilder.create(client, clientConf));
+ assertEquals(TTransportException.END_OF_FILE, te.getType());
+ assertTrue(te.getMessage().contains("Socket is closed by peer"));
+ } finally {
+
miniHS2.getHiveConf().unset(MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE.getVarname());
+ }
}
private Connection getConnection(String userName) throws Exception {
diff --git
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index edb7631fabf..eeed5dd6b38 100644
---
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -47,7 +47,9 @@
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
@@ -100,7 +102,15 @@ protected void initServer() {
// Server args
int maxMessageSize =
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
- TThreadPoolServer.Args sargs = new
TThreadPoolServer.Args(serverSocket).processorFactory(processorFactory)
+ TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(new
TServerSocket(serverSocket.getServerSocket()) {
+ @Override
+ public TSocket accept() throws TTransportException {
+ TSocket ts = super.accept();
+ int maxThriftMessageSize = (int) HiveConf.getSizeVar(
+ hiveConf, HiveConf.ConfVars.HIVE_THRIFT_CLIENT_MAX_MESSAGE_SIZE);
+ return HiveAuthUtils.configureThriftMaxMessageSize(ts,
maxThriftMessageSize);
+ }
+ }).processorFactory(processorFactory)
.transportFactory(transportFactory).protocolFactory(new
TBinaryProtocol.Factory())
.inputProtocolFactory(new TBinaryProtocol.Factory(true, true,
maxMessageSize, maxMessageSize))
.executorService(executorService);
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 6cd45e34aff..9cbbe7e43fe 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -21,7 +21,6 @@
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
-import org.apache.commons.cli.OptionBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
@@ -40,7 +39,6 @@
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import
org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
-import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
import org.apache.hadoop.hive.metastore.utils.LogUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.MetastoreVersionInfo;
@@ -61,7 +59,9 @@
import org.apache.thrift.server.TServlet;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.eclipse.jetty.security.ConstraintMapping;
@@ -219,69 +219,14 @@ public static long renewDelegationToken(String
tokenStrForm) throws IOException
return delegationTokenManager.renewDelegationToken(tokenStrForm);
}
- /**
- * HiveMetaStore specific CLI
- *
- */
- public static class HiveMetastoreCli extends CommonCliOptions {
- private int port;
-
- @SuppressWarnings("static-access")
- HiveMetastoreCli(Configuration configuration) {
- super("hivemetastore", true);
- this.port = MetastoreConf.getIntVar(configuration, ConfVars.SERVER_PORT);
-
- // -p port
- OPTIONS.addOption(OptionBuilder
- .hasArg()
- .withArgName("port")
- .withDescription("Hive Metastore port number, default:"
- + this.port)
- .create('p'));
-
- }
- @Override
- public void parse(String[] args) {
- super.parse(args);
-
- // support the old syntax "hivemetastore [port]" but complain
- args = commandLine.getArgs();
- if (args.length > 0) {
- // complain about the deprecated syntax -- but still run
- System.err.println(
- "This usage has been deprecated, consider using the new command "
- + "line syntax (run with -h to see usage information)");
-
- this.port = Integer.parseInt(args[0]);
- }
-
- // notice that command line options take precedence over the
- // deprecated (old style) naked args...
-
- if (commandLine.hasOption('p')) {
- this.port = Integer.parseInt(commandLine.getOptionValue('p'));
- } else {
- // legacy handling
- String metastorePort = System.getenv("METASTORE_PORT");
- if (metastorePort != null) {
- this.port = Integer.parseInt(metastorePort);
- }
- }
- }
-
- public int getPort() {
- return this.port;
- }
- }
-
/*
Interface to encapsulate Http and binary thrift server for
HiveMetastore
*/
private interface ThriftServer {
- public void start() throws Throwable;
- public boolean isRunning();
- public IHMSHandler getHandler();
+ void start() throws Throwable;
+ boolean isRunning();
+ IHMSHandler getHandler();
}
/**
@@ -316,7 +261,7 @@ public static void main(String[] args) throws Throwable {
startupShutdownMessage(HiveMetaStore.class, args, LOG);
try {
- String msg = "Starting hive metastore on port " + cli.port;
+ String msg = "Starting hive metastore on port " + cli.getPort();
LOG.info(msg);
if (cli.isVerbose()) {
System.err.println(msg);
@@ -631,10 +576,6 @@ private static ThriftServer startBinaryMetastore(int port,
HadoopThriftAuthBridg
keyStorePassword, keyStoreType, keyStoreAlgorithm,
sslVersionBlacklist);
}
- if (tcpKeepAlive) {
- serverSocket = new TServerSocketKeepAlive(serverSocket);
- }
-
ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads,
maxWorkerThreads,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(), r -> {
Thread thread = new Thread(r);
@@ -643,7 +584,21 @@ private static ThriftServer startBinaryMetastore(int port,
HadoopThriftAuthBridg
return thread;
});
- TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
+ TThreadPoolServer.Args args =
+ new TThreadPoolServer.Args(new
TServerSocket(serverSocket.getServerSocket()) {
+ @Override
+ public TSocket accept() throws TTransportException {
+ TSocket ts = super.accept();
+ // get the limit from the configuration for every new connection
+ int maxThriftMessageSize = (int) MetastoreConf.getSizeVar(
+ conf,
MetastoreConf.ConfVars.THRIFT_METASTORE_CLIENT_MAX_MESSAGE_SIZE);
+ HMSHandler.LOG.debug("Thrift maxMessageSize = {}",
maxThriftMessageSize);
+ if (maxThriftMessageSize > 0) {
+ ts.getConfiguration().setMaxMessageSize(maxThriftMessageSize);
+ }
+ return ts;
+ }
+ })
.processor(processor)
.transportFactory(transFactory)
.protocolFactory(protocolFactory)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetastoreCli.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetastoreCli.java
new file mode 100644
index 00000000000..248eed0f687
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetastoreCli.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.CommonCliOptions;
+
+/**
+ * HiveMetaStore specific CLI
+ *
+ */
+class HiveMetastoreCli extends CommonCliOptions {
+ private int port;
+
+ @SuppressWarnings("static-access")
+ HiveMetastoreCli(Configuration configuration) {
+ super("hivemetastore", true);
+ this.port = MetastoreConf.getIntVar(configuration,
MetastoreConf.ConfVars.SERVER_PORT);
+
+ // -p port
+ OPTIONS.addOption(OptionBuilder
+ .hasArg()
+ .withArgName("port")
+ .withDescription("Hive Metastore port number, default:"
+ + this.port)
+ .create('p'));
+
+ }
+ @Override
+ public void parse(String[] args) {
+ super.parse(args);
+
+ // support the old syntax "hivemetastore [port]" but complain
+ args = commandLine.getArgs();
+ if (args.length > 0) {
+ // complain about the deprecated syntax -- but still run
+ System.err.println(
+ "This usage has been deprecated, consider using the new command "
+ + "line syntax (run with -h to see usage information)");
+
+ this.port = Integer.parseInt(args[0]);
+ }
+
+ // notice that command line options take precedence over the
+ // deprecated (old style) naked args...
+
+ if (commandLine.hasOption('p')) {
+ this.port = Integer.parseInt(commandLine.getOptionValue('p'));
+ } else {
+ // legacy handling
+ String metastorePort = System.getenv("METASTORE_PORT");
+ if (metastorePort != null) {
+ this.port = Integer.parseInt(metastorePort);
+ }
+ }
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TServerSocketKeepAlive.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TServerSocketKeepAlive.java
deleted file mode 100644
index e4b1b981da9..00000000000
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/TServerSocketKeepAlive.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import java.net.SocketException;
-
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * TServerSocketKeepAlive - like TServerSocket, but will enable keepalive for
- * accepted sockets.
- *
- */
-public class TServerSocketKeepAlive extends TServerSocket {
- public TServerSocketKeepAlive(TServerSocket serverSocket) throws
TTransportException {
- super(serverSocket.getServerSocket());
- }
-}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetastoreCli.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetastoreCli.java
index 0d10be1e447..672307c5b7a 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetastoreCli.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetastoreCli.java
@@ -32,14 +32,14 @@ public class TestHiveMetastoreCli {
@Test
public void testDefaultCliPortValue() {
Configuration configuration = MetastoreConf.newMetastoreConf();
- HiveMetaStore.HiveMetastoreCli cli = new
HiveMetaStore.HiveMetastoreCli(configuration);
+ HiveMetastoreCli cli = new HiveMetastoreCli(configuration);
assert (cli.getPort() == MetastoreConf.getIntVar(configuration,
ConfVars.SERVER_PORT));
}
@Test
public void testOverriddenCliPortValue() {
Configuration configuration = MetastoreConf.newMetastoreConf();
- HiveMetaStore.HiveMetastoreCli cli = new
HiveMetaStore.HiveMetastoreCli(configuration);
+ HiveMetastoreCli cli = new HiveMetastoreCli(configuration);
cli.parse(TestHiveMetastoreCli.CLI_ARGUMENTS);
assert (cli.getPort() == 9999);
@@ -50,7 +50,7 @@ public void testOverriddenMetastoreServerPortValue() {
Configuration configuration = MetastoreConf.newMetastoreConf();
MetastoreConf.setLongVar(configuration, ConfVars.SERVER_PORT, 12345);
- HiveMetaStore.HiveMetastoreCli cli = new
HiveMetaStore.HiveMetastoreCli(configuration);
+ HiveMetastoreCli cli = new HiveMetastoreCli(configuration);
assert (cli.getPort() == 12345);
}
@@ -60,7 +60,7 @@ public void testCliOverridesConfiguration() {
Configuration configuration = MetastoreConf.newMetastoreConf();
MetastoreConf.setLongVar(configuration, ConfVars.SERVER_PORT, 12345);
- HiveMetaStore.HiveMetastoreCli cli = new
HiveMetaStore.HiveMetastoreCli(configuration);
+ HiveMetastoreCli cli = new HiveMetastoreCli(configuration);
cli.parse(CLI_ARGUMENTS);
assert (cli.getPort() == 9999);