This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 528a964e5f9 [improvement](fe) Migrate HMS client pool to Commons Pool
(#61553)
528a964e5f9 is described below
commit 528a964e5f9ca63bff0488b70d9e4b2c9a586fba
Author: Socrates <[email protected]>
AuthorDate: Wed Apr 1 06:57:11 2026 +0800
[improvement](fe) Migrate HMS client pool to Commons Pool (#61553)
### What problem does this PR solve?
Problem Summary:
This PR migrates `ThriftHMSCachedClient` from a self-managed queue-based
HMS client pool to `commons-pool2` while keeping the external FE
interface unchanged.
It also clarifies the responsibility boundary of the HMS pool:
- The pool only manages FE-side client lifecycle, including create,
borrow/return, invalidate on failure, and destroy on close.
- Hive-side socket lifetime and reconnect remain handled by
`RetryingMetaStoreClient`.
- The pool does not interpret `hive.metastore.client.socket.lifetime` or
probe remote socket health.
Additional changes in this PR:
- Preserve throwable-based invalidation so failed borrowers are not
returned to the pool.
- Fix nested borrow hazards in `updateTableStatistics` and lock polling
paths under strict pool limits.
- Add FE unit coverage for normal return, invalidation, blocking borrow
reuse, close behavior, and single-client nested-call behavior.
---
.../datasource/hive/ThriftHMSCachedClient.java | 307 +++++++++++----
.../datasource/hive/ThriftHMSCachedClientTest.java | 416 +++++++++++++++++++++
2 files changed, 640 insertions(+), 83 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index 840ad765587..c58fe3bb58f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -31,6 +31,11 @@ import
com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -69,12 +74,10 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
-import java.util.Queue;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -88,32 +91,41 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
private static final HiveMetaHookLoader DUMMY_HOOK_LOADER = t -> null;
// -1 means no limit on the partitions returned.
private static final short MAX_LIST_PARTITION_NUM =
Config.max_hive_list_partition_num;
+ private static final long CLIENT_POOL_BORROW_TIMEOUT_MS = 60_000L;
- private Queue<ThriftHMSClient> clientPool = new LinkedList<>();
- private boolean isClosed = false;
- private final int poolSize;
+ private final GenericObjectPool<ThriftHMSClient> clientPool;
+ private volatile boolean isClosed = false;
private final HiveConf hiveConf;
- private ExecutionAuthenticator executionAuthenticator;
+ private final ExecutionAuthenticator executionAuthenticator;
+ private final MetaStoreClientProvider metaStoreClientProvider;
public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize,
ExecutionAuthenticator executionAuthenticator) {
- Preconditions.checkArgument(poolSize > 0, poolSize);
+ this(hiveConf, poolSize, executionAuthenticator, new
DefaultMetaStoreClientProvider());
+ }
+
+ ThriftHMSCachedClient(HiveConf hiveConf, int poolSize,
ExecutionAuthenticator executionAuthenticator,
+ MetaStoreClientProvider metaStoreClientProvider) {
+ Preconditions.checkArgument(poolSize >= 0, poolSize);
this.hiveConf = hiveConf;
- this.poolSize = poolSize;
- this.isClosed = false;
this.executionAuthenticator = executionAuthenticator;
+ this.metaStoreClientProvider =
Preconditions.checkNotNull(metaStoreClientProvider, "metaStoreClientProvider");
+ this.clientPool = poolSize == 0 ? null
+ : new GenericObjectPool<>(new ThriftHMSClientFactory(),
createPoolConfig(poolSize));
}
@Override
public void close() {
- synchronized (clientPool) {
- this.isClosed = true;
- while (!clientPool.isEmpty()) {
- try {
- clientPool.poll().close();
- } catch (Exception e) {
- LOG.warn("failed to close thrift client", e);
- }
- }
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
+ if (clientPool == null) {
+ return;
+ }
+ try {
+ clientPool.close();
+ } catch (Exception e) {
+ LOG.warn("failed to close thrift client pool", e);
}
}
@@ -545,7 +557,7 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
"acquire lock timeout for txn " + txnId + " of
query " + queryId + ", timeout(ms): "
+ timeoutMs);
}
- response = checkLock(lockId);
+ response = checkLock(client, lockId);
}
if (response.getState() != LockState.ACQUIRED) {
@@ -599,15 +611,11 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
}
}
- private LockResponse checkLock(long lockId) {
- try (ThriftHMSClient client = getClient()) {
- try {
- return ugiDoAs(() -> client.client.checkLock(lockId));
- } catch (Exception e) {
- client.setThrowable(e);
- throw e;
- }
+ private LockResponse checkLock(ThriftHMSClient client, long lockId) {
+ try {
+ return ugiDoAs(() -> client.client.checkLock(lockId));
} catch (Exception e) {
+ client.setThrowable(e);
throw new RuntimeException("failed to check lock " + lockId, e);
}
}
@@ -636,53 +644,155 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
return builder.build();
}
+ /**
+ * The Doris HMS pool only manages client object lifecycle in FE:
+ * 1. Create clients.
+ * 2. Borrow and return clients.
+ * 3. Invalidate borrowers that have already failed.
+ * 4. Destroy clients when the pool is closed.
+ *
+ * The pool does not manage Hive-side socket lifetime or reconnect:
+ * 1. RetryingMetaStoreClient handles
hive.metastore.client.socket.lifetime itself.
+ * 2. The pool does not interpret that config.
+ * 3. The pool does not probe remote socket health.
+ */
+ private GenericObjectPoolConfig createPoolConfig(int poolSize) {
+ GenericObjectPoolConfig config = new GenericObjectPoolConfig();
+ config.setMaxTotal(poolSize);
+ config.setMaxIdle(poolSize);
+ config.setMinIdle(0);
+ config.setBlockWhenExhausted(true);
+ config.setMaxWaitMillis(CLIENT_POOL_BORROW_TIMEOUT_MS);
+ config.setTestOnBorrow(false);
+ config.setTestOnReturn(false);
+ config.setTestWhileIdle(false);
+ config.setTimeBetweenEvictionRunsMillis(-1L);
+ return config;
+ }
+
+ static String getMetastoreClientClassName(HiveConf hiveConf) {
+ String type = hiveConf.get(HMSBaseProperties.HIVE_METASTORE_TYPE);
+ if (HMSBaseProperties.DLF_TYPE.equalsIgnoreCase(type)) {
+ return ProxyMetaStoreClient.class.getName();
+ } else if (HMSBaseProperties.GLUE_TYPE.equalsIgnoreCase(type)) {
+ return AWSCatalogMetastoreClient.class.getName();
+ } else {
+ return HiveMetaStoreClient.class.getName();
+ }
+ }
+
+ private <T> T withSystemClassLoader(PrivilegedExceptionAction<T> action)
throws Exception {
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+ return action.run();
+ } finally {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ }
+ }
+
+ private class ThriftHMSClientFactory extends
BasePooledObjectFactory<ThriftHMSClient> {
+ @Override
+ public ThriftHMSClient create() throws Exception {
+ return createClient();
+ }
+
+ @Override
+ public PooledObject<ThriftHMSClient> wrap(ThriftHMSClient client) {
+ return new DefaultPooledObject<>(client);
+ }
+
+ @Override
+ public boolean validateObject(PooledObject<ThriftHMSClient>
pooledObject) {
+ return !isClosed && pooledObject.getObject().isValid();
+ }
+
+ @Override
+ public void destroyObject(PooledObject<ThriftHMSClient> pooledObject)
throws Exception {
+ pooledObject.getObject().destroy();
+ }
+ }
+
private class ThriftHMSClient implements AutoCloseable {
private final IMetaStoreClient client;
+ private volatile boolean destroyed;
private volatile Throwable throwable;
- private ThriftHMSClient(HiveConf hiveConf) throws MetaException {
- String type = hiveConf.get(HMSBaseProperties.HIVE_METASTORE_TYPE);
- if (HMSBaseProperties.DLF_TYPE.equalsIgnoreCase(type)) {
- client = RetryingMetaStoreClient.getProxy(hiveConf,
DUMMY_HOOK_LOADER,
- ProxyMetaStoreClient.class.getName());
- } else if (HMSBaseProperties.GLUE_TYPE.equalsIgnoreCase(type)) {
- client = RetryingMetaStoreClient.getProxy(hiveConf,
DUMMY_HOOK_LOADER,
- AWSCatalogMetastoreClient.class.getName());
- } else {
- client = RetryingMetaStoreClient.getProxy(hiveConf,
DUMMY_HOOK_LOADER,
- HiveMetaStoreClient.class.getName());
- }
+ private ThriftHMSClient(IMetaStoreClient client) {
+ this.client = client;
}
public void setThrowable(Throwable throwable) {
this.throwable = throwable;
}
+ private boolean isValid() {
+ return !destroyed && throwable == null;
+ }
+
+ private void destroy() {
+ if (destroyed) {
+ return;
+ }
+ destroyed = true;
+ client.close();
+ }
+
@Override
public void close() throws Exception {
- synchronized (clientPool) {
- if (isClosed || throwable != null || clientPool.size() >
poolSize) {
- client.close();
+ if (clientPool == null) {
+ destroy();
+ return;
+ }
+ if (isClosed) {
+ destroy();
+ return;
+ }
+ try {
+ if (throwable != null) {
+ clientPool.invalidateObject(this);
} else {
- clientPool.offer(this);
+ clientPool.returnObject(this);
}
+ } catch (IllegalStateException e) {
+ destroy();
+ } catch (Exception e) {
+ destroy();
+ throw e;
}
}
}
private ThriftHMSClient getClient() {
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
try {
-
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
- synchronized (clientPool) {
- ThriftHMSClient client = clientPool.poll();
- if (client == null) {
- return ugiDoAs(() -> new ThriftHMSClient(hiveConf));
- }
- return client;
+ Preconditions.checkState(!isClosed, "HMS client pool is closed");
+ if (clientPool == null) {
+ return createClient();
}
- } finally {
- Thread.currentThread().setContextClassLoader(classLoader);
+ return clientPool.borrowObject();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HMSClientException("failed to borrow hms client from
pool", e);
+ }
+ }
+
+ private ThriftHMSClient createClient() throws Exception {
+ return withSystemClassLoader(() -> ugiDoAs(
+ () -> new
ThriftHMSClient(metaStoreClientProvider.create(hiveConf))));
+ }
+
+ // Keep the HMS client creation behind an injectable seam so unit tests
can verify
+ // Doris-side pool behavior without relying on Hive static construction
internals.
+ interface MetaStoreClientProvider {
+ IMetaStoreClient create(HiveConf hiveConf) throws MetaException;
+ }
+
+ private static class DefaultMetaStoreClientProvider implements
MetaStoreClientProvider {
+ @Override
+ public IMetaStoreClient create(HiveConf hiveConf) throws MetaException
{
+ return RetryingMetaStoreClient.getProxy(hiveConf,
DUMMY_HOOK_LOADER,
+ getMetastoreClientClassName(hiveConf));
}
}
@@ -715,17 +825,24 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
String tableName,
Function<HivePartitionStatistics, HivePartitionStatistics> update)
{
try (ThriftHMSClient client = getClient()) {
-
- Table originTable = getTable(dbName, tableName);
- Map<String, String> originParams = originTable.getParameters();
- HivePartitionStatistics updatedStats =
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
-
- Table newTable = originTable.deepCopy();
- Map<String, String> newParams =
- HiveUtil.updateStatisticsParameters(originParams,
updatedStats.getCommonStatistics());
- newParams.put("transient_lastDdlTime",
String.valueOf(System.currentTimeMillis() / 1000));
- newTable.setParameters(newParams);
- client.client.alter_table(dbName, tableName, newTable);
+ try {
+ Table originTable = ugiDoAs(() ->
client.client.getTable(dbName, tableName));
+ Map<String, String> originParams = originTable.getParameters();
+ HivePartitionStatistics updatedStats =
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
+
+ Table newTable = originTable.deepCopy();
+ Map<String, String> newParams =
+ HiveUtil.updateStatisticsParameters(originParams,
updatedStats.getCommonStatistics());
+ newParams.put("transient_lastDdlTime",
String.valueOf(System.currentTimeMillis() / 1000));
+ newTable.setParameters(newParams);
+ ugiDoAs(() -> {
+ client.client.alter_table(dbName, tableName, newTable);
+ return null;
+ });
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException("failed to update table statistics for
" + dbName + "." + tableName, e);
}
@@ -738,22 +855,30 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update)
{
try (ThriftHMSClient client = getClient()) {
- List<Partition> partitions = client.client.getPartitionsByNames(
- dbName, tableName, ImmutableList.of(partitionName));
- if (partitions.size() != 1) {
- throw new RuntimeException("Metastore returned multiple
partitions for name: " + partitionName);
- }
+ try {
+ List<Partition> partitions = ugiDoAs(() ->
client.client.getPartitionsByNames(
+ dbName, tableName, ImmutableList.of(partitionName)));
+ if (partitions.size() != 1) {
+ throw new RuntimeException("Metastore returned multiple
partitions for name: " + partitionName);
+ }
- Partition originPartition = partitions.get(0);
- Map<String, String> originParams = originPartition.getParameters();
- HivePartitionStatistics updatedStats =
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
+ Partition originPartition = partitions.get(0);
+ Map<String, String> originParams =
originPartition.getParameters();
+ HivePartitionStatistics updatedStats =
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
- Partition modifiedPartition = originPartition.deepCopy();
- Map<String, String> newParams =
- HiveUtil.updateStatisticsParameters(originParams,
updatedStats.getCommonStatistics());
- newParams.put("transient_lastDdlTime",
String.valueOf(System.currentTimeMillis() / 1000));
- modifiedPartition.setParameters(newParams);
- client.client.alter_partition(dbName, tableName,
modifiedPartition);
+ Partition modifiedPartition = originPartition.deepCopy();
+ Map<String, String> newParams =
+ HiveUtil.updateStatisticsParameters(originParams,
updatedStats.getCommonStatistics());
+ newParams.put("transient_lastDdlTime",
String.valueOf(System.currentTimeMillis() / 1000));
+ modifiedPartition.setParameters(newParams);
+ ugiDoAs(() -> {
+ client.client.alter_partition(dbName, tableName,
modifiedPartition);
+ return null;
+ });
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException("failed to update table statistics for
" + dbName + "." + tableName, e);
}
@@ -762,10 +887,18 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
@Override
public void addPartitions(String dbName, String tableName,
List<HivePartitionWithStatistics> partitions) {
try (ThriftHMSClient client = getClient()) {
- List<Partition> hivePartitions = partitions.stream()
- .map(HiveUtil::toMetastoreApiPartition)
- .collect(Collectors.toList());
- client.client.add_partitions(hivePartitions);
+ try {
+ List<Partition> hivePartitions = partitions.stream()
+ .map(HiveUtil::toMetastoreApiPartition)
+ .collect(Collectors.toList());
+ ugiDoAs(() -> {
+ client.client.add_partitions(hivePartitions);
+ return null;
+ });
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException("failed to add partitions for " +
dbName + "." + tableName, e);
}
@@ -774,7 +907,15 @@ public class ThriftHMSCachedClient implements
HMSCachedClient {
@Override
public void dropPartition(String dbName, String tableName, List<String>
partitionValues, boolean deleteData) {
try (ThriftHMSClient client = getClient()) {
- client.client.dropPartition(dbName, tableName, partitionValues,
deleteData);
+ try {
+ ugiDoAs(() -> {
+ client.client.dropPartition(dbName, tableName,
partitionValues, deleteData);
+ return null;
+ });
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException("failed to drop partition for " +
dbName + "." + tableName, e);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java
new file mode 100644
index 00000000000..46af20d72c9
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java
@@ -0,0 +1,416 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.datasource.NameMapping;
+import org.apache.doris.datasource.property.metastore.HMSBaseProperties;
+import org.apache.doris.info.TableNameInfo;
+
+import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
+import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Proxy;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ThriftHMSCachedClientTest {
+ private MockMetastoreClientProvider provider;
+
+ @Before
+ public void setUp() {
+ provider = new MockMetastoreClientProvider();
+ }
+
+ @Test
+ public void testPoolConfigKeepsBorrowValidationAndIdleEvictionDisabled() {
+ ThriftHMSCachedClient cachedClient = newClient(1);
+
+ GenericObjectPool<?> pool = getPool(cachedClient);
+ Assert.assertFalse(pool.getTestOnBorrow());
+ Assert.assertFalse(pool.getTestOnReturn());
+ Assert.assertFalse(pool.getTestWhileIdle());
+ Assert.assertEquals(60_000L, pool.getMaxWaitMillis());
+ Assert.assertEquals(-1L, pool.getTimeBetweenEvictionRunsMillis());
+ }
+
+ @Test
+ public void testPoolDisabledCreatesAndClosesClientPerBorrow() throws
Exception {
+ ThriftHMSCachedClient cachedClient = newClient(0);
+
+ Assert.assertNull(getPool(cachedClient));
+
+ Object firstBorrowed = borrowClient(cachedClient);
+ closeBorrowed(firstBorrowed);
+ Assert.assertEquals(1, provider.createdClients.get());
+ Assert.assertEquals(1, provider.closedClients.get());
+
+ Object secondBorrowed = borrowClient(cachedClient);
+ Assert.assertNotSame(firstBorrowed, secondBorrowed);
+ closeBorrowed(secondBorrowed);
+ Assert.assertEquals(2, provider.createdClients.get());
+ Assert.assertEquals(2, provider.closedClients.get());
+ }
+
+ @Test
+ public void testReturnObjectToPool() throws Exception {
+ ThriftHMSCachedClient cachedClient = newClient(1);
+
+ Object firstBorrowed = borrowClient(cachedClient);
+ closeBorrowed(firstBorrowed);
+
+ Assert.assertEquals(1, getPool(cachedClient).getNumIdle());
+ Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+ Assert.assertEquals(1, provider.createdClients.get());
+ Assert.assertEquals(0, provider.closedClients.get());
+
+ Object secondBorrowed = borrowClient(cachedClient);
+ Assert.assertSame(firstBorrowed, secondBorrowed);
+ closeBorrowed(secondBorrowed);
+ }
+
+ @Test
+ public void testInvalidateBrokenObject() throws Exception {
+ ThriftHMSCachedClient cachedClient = newClient(1);
+
+ Object brokenBorrowed = borrowClient(cachedClient);
+ markBorrowedBroken(brokenBorrowed, new RuntimeException("broken"));
+ closeBorrowed(brokenBorrowed);
+
+ Assert.assertEquals(0, getPool(cachedClient).getNumIdle());
+ Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+ Assert.assertEquals(1, provider.createdClients.get());
+ Assert.assertEquals(1, provider.closedClients.get());
+
+ Object nextBorrowed = borrowClient(cachedClient);
+ Assert.assertNotSame(brokenBorrowed, nextBorrowed);
+ Assert.assertEquals(2, provider.createdClients.get());
+ closeBorrowed(nextBorrowed);
+ }
+
+ @Test
+ public void testBorrowBlocksUntilObjectReturned() throws Exception {
+ ThriftHMSCachedClient cachedClient = newClient(1);
+ Object firstBorrowed = borrowClient(cachedClient);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ Future<Object> waitingBorrow = executor.submit(() ->
borrowClient(cachedClient));
+ Thread.sleep(200L);
+ Assert.assertFalse(waitingBorrow.isDone());
+
+ closeBorrowed(firstBorrowed);
+
+ Object secondBorrowed = waitingBorrow.get(2, TimeUnit.SECONDS);
+ Assert.assertSame(firstBorrowed, secondBorrowed);
+ closeBorrowed(secondBorrowed);
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testCloseDestroysIdleObjectsAndRejectsBorrow() throws
Exception {
+ ThriftHMSCachedClient cachedClient = newClient(1);
+ Object borrowed = borrowClient(cachedClient);
+ closeBorrowed(borrowed);
+
+ cachedClient.close();
+
+ Assert.assertTrue(getPool(cachedClient).isClosed());
+ Assert.assertEquals(1, provider.closedClients.get());
+ Assert.assertThrows(IllegalStateException.class, () ->
borrowClient(cachedClient));
+ }
+
+ @Test
+ public void testCloseWhileObjectBorrowedClosesClientOnReturn() throws
Exception {
+ ThriftHMSCachedClient cachedClient = newClient(1);
+ Object borrowed = borrowClient(cachedClient);
+
+ cachedClient.close();
+ Assert.assertEquals(0, provider.closedClients.get());
+
+ closeBorrowed(borrowed);
+
+ Assert.assertEquals(1, provider.closedClients.get());
+ Assert.assertEquals(0, getPool(cachedClient).getNumIdle());
+ }
+
+ @Test
+ public void testGetMetastoreClientClassName() {
+ HiveConf hiveConf = new HiveConf();
+ Assert.assertEquals(HiveMetaStoreClient.class.getName(),
+ ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf));
+
+ hiveConf.set(HMSBaseProperties.HIVE_METASTORE_TYPE,
HMSBaseProperties.GLUE_TYPE);
+ Assert.assertEquals(AWSCatalogMetastoreClient.class.getName(),
+ ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf));
+
+ hiveConf.set(HMSBaseProperties.HIVE_METASTORE_TYPE,
HMSBaseProperties.DLF_TYPE);
+ Assert.assertEquals(ProxyMetaStoreClient.class.getName(),
+ ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf));
+ }
+
+ @Test
+ public void testUpdateTableStatisticsDoesNotBorrowSecondClient() {
+ ThriftHMSCachedClient cachedClient = newClient(1);
+
+ cachedClient.updateTableStatistics("db1", "tbl1", statistics ->
statistics);
+
+ Assert.assertEquals(1, provider.createdClients.get());
+ Assert.assertEquals(1, getPool(cachedClient).getNumIdle());
+ Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+ }
+
+ @Test
+ public void testAcquireSharedLockDoesNotBorrowSecondClient() {
+ provider.lockStates.add(LockState.WAITING);
+ provider.lockStates.add(LockState.ACQUIRED);
+ ThriftHMSCachedClient cachedClient = newClient(1);
+
+ cachedClient.acquireSharedLock("query-1", 1L, "user",
+ new TableNameInfo("db1", "tbl1"), Collections.emptyList(),
5_000L);
+
+ Assert.assertEquals(1, provider.createdClients.get());
+ Assert.assertEquals(1, provider.checkLockCalls.get());
+ Assert.assertEquals(1, getPool(cachedClient).getNumIdle());
+ Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+ }
+
+ @Test
+ public void testUpdatePartitionStatisticsInvalidatesFailedClient() throws
Exception {
+ provider.alterPartitionFailure = new RuntimeException("alter partition
failed");
+ ThriftHMSCachedClient cachedClient = newClient(1);
+
+ RuntimeException exception =
Assert.assertThrows(RuntimeException.class,
+ () -> cachedClient.updatePartitionStatistics("db1", "tbl1",
"p1", statistics -> statistics));
+ Assert.assertTrue(exception.getMessage().contains("failed to update
table statistics"));
+ assertBrokenBorrowerIsNotReused(cachedClient);
+ }
+
+ @Test
+ public void testAddPartitionsInvalidatesFailedClient() throws Exception {
+ provider.addPartitionsFailure = new RuntimeException("add partitions
failed");
+ ThriftHMSCachedClient cachedClient = newClient(1);
+
+ RuntimeException exception =
Assert.assertThrows(RuntimeException.class,
+ () -> cachedClient.addPartitions("db1", "tbl1",
Collections.singletonList(newPartitionWithStatistics())));
+ Assert.assertTrue(exception.getMessage().contains("failed to add
partitions"));
+ assertBrokenBorrowerIsNotReused(cachedClient);
+ }
+
+ @Test
+ public void testDropPartitionInvalidatesFailedClient() throws Exception {
+ provider.dropPartitionFailure = new RuntimeException("drop partition
failed");
+ ThriftHMSCachedClient cachedClient = newClient(1);
+
+ RuntimeException exception =
Assert.assertThrows(RuntimeException.class,
+ () -> cachedClient.dropPartition("db1", "tbl1",
Collections.singletonList("p1"), false));
+ Assert.assertTrue(exception.getMessage().contains("failed to drop
partition"));
+ assertBrokenBorrowerIsNotReused(cachedClient);
+ }
+
+ private void assertBrokenBorrowerIsNotReused(ThriftHMSCachedClient
cachedClient) throws Exception {
+ Assert.assertEquals(0, getPool(cachedClient).getNumIdle());
+ Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+ Assert.assertEquals(1, provider.createdClients.get());
+ Assert.assertEquals(1, provider.closedClients.get());
+
+ Object nextBorrowed = borrowClient(cachedClient);
+ Assert.assertEquals(2, provider.createdClients.get());
+ closeBorrowed(nextBorrowed);
+ }
+
+ private ThriftHMSCachedClient newClient(int poolSize) {
+ return newClient(new HiveConf(), poolSize);
+ }
+
+ private ThriftHMSCachedClient newClient(HiveConf hiveConf, int poolSize) {
+ return new ThriftHMSCachedClient(hiveConf, poolSize, new
ExecutionAuthenticator() {
+ }, provider);
+ }
+
+ private GenericObjectPool<?> getPool(ThriftHMSCachedClient cachedClient) {
+ return Deencapsulation.getField(cachedClient, "clientPool");
+ }
+
+ private Object borrowClient(ThriftHMSCachedClient cachedClient) {
+ return Deencapsulation.invoke(cachedClient, "getClient");
+ }
+
+ private void markBorrowedBroken(Object borrowedClient, Throwable
throwable) {
+ Deencapsulation.invoke(borrowedClient, "setThrowable", throwable);
+ }
+
+ private void closeBorrowed(Object borrowedClient) throws Exception {
+ ((AutoCloseable) borrowedClient).close();
+ }
+
+ private HivePartitionWithStatistics newPartitionWithStatistics() {
+ HivePartition partition = new HivePartition(
+ NameMapping.createForTest("db1", "tbl1"),
+ false,
+ "input-format",
+ "file:///tmp/part",
+ Collections.singletonList("p1"),
+ new HashMap<>(),
+ "output-format",
+ "serde",
+ Collections.singletonList(new FieldSchema("c1", "string",
"")));
+ return new HivePartitionWithStatistics("k1=v1", partition,
HivePartitionStatistics.EMPTY);
+ }
+
+ private static class MockMetastoreClientProvider implements
ThriftHMSCachedClient.MetaStoreClientProvider {
+ private final AtomicInteger createdClients = new AtomicInteger();
+ private final AtomicInteger closedClients = new AtomicInteger();
+ private final AtomicInteger checkLockCalls = new AtomicInteger();
+ private final Deque<LockState> lockStates = new ArrayDeque<>();
+
+ private volatile RuntimeException alterPartitionFailure;
+ private volatile RuntimeException addPartitionsFailure;
+ private volatile RuntimeException dropPartitionFailure;
+
+ @Override
+ public IMetaStoreClient create(HiveConf hiveConf) {
+ createdClients.incrementAndGet();
+ return (IMetaStoreClient) Proxy.newProxyInstance(
+ IMetaStoreClient.class.getClassLoader(),
+ new Class[] {IMetaStoreClient.class},
+ (proxy, method, args) -> handleMethod(proxy,
method.getName(), args, method.getReturnType()));
+ }
+
+ private Object handleMethod(Object proxy, String methodName, Object[]
args, Class<?> returnType) {
+ if ("close".equals(methodName)) {
+ closedClients.incrementAndGet();
+ return null;
+ }
+ if ("hashCode".equals(methodName)) {
+ return System.identityHashCode(proxy);
+ }
+ if ("equals".equals(methodName)) {
+ return proxy == args[0];
+ }
+ if ("toString".equals(methodName)) {
+ return "MockHmsClient";
+ }
+ if ("getTable".equals(methodName)) {
+ Table table = new Table();
+ table.setParameters(new HashMap<>());
+ return table;
+ }
+ if ("getPartitionsByNames".equals(methodName)) {
+ Partition partition = new Partition();
+ partition.setParameters(new HashMap<>());
+ return Collections.singletonList(partition);
+ }
+ if ("alter_partition".equals(methodName)) {
+ if (alterPartitionFailure != null) {
+ throw alterPartitionFailure;
+ }
+ return null;
+ }
+ if ("add_partitions".equals(methodName)) {
+ if (addPartitionsFailure != null) {
+ throw addPartitionsFailure;
+ }
+ return 1;
+ }
+ if ("dropPartition".equals(methodName)) {
+ if (dropPartitionFailure != null) {
+ throw dropPartitionFailure;
+ }
+ return true;
+ }
+ if ("lock".equals(methodName)) {
+ return newLockResponse(nextLockState());
+ }
+ if ("checkLock".equals(methodName)) {
+ checkLockCalls.incrementAndGet();
+ return newLockResponse(nextLockState());
+ }
+ return defaultValue(returnType);
+ }
+
+ private LockState nextLockState() {
+ synchronized (lockStates) {
+ if (lockStates.isEmpty()) {
+ return LockState.ACQUIRED;
+ }
+ return lockStates.removeFirst();
+ }
+ }
+
+ private LockResponse newLockResponse(LockState state) {
+ LockResponse response = new LockResponse();
+ response.setLockid(1L);
+ response.setState(state);
+ return response;
+ }
+
+ private Object defaultValue(Class<?> returnType) {
+ if (!returnType.isPrimitive()) {
+ return null;
+ }
+ if (returnType == boolean.class) {
+ return false;
+ }
+ if (returnType == byte.class) {
+ return (byte) 0;
+ }
+ if (returnType == short.class) {
+ return (short) 0;
+ }
+ if (returnType == int.class) {
+ return 0;
+ }
+ if (returnType == long.class) {
+ return 0L;
+ }
+ if (returnType == float.class) {
+ return 0F;
+ }
+ if (returnType == double.class) {
+ return 0D;
+ }
+ if (returnType == char.class) {
+ return '\0';
+ }
+ return null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]