This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 528a964e5f9 [improvement](fe) Migrate HMS client pool to Commons Pool 
(#61553)
528a964e5f9 is described below

commit 528a964e5f9ca63bff0488b70d9e4b2c9a586fba
Author: Socrates <[email protected]>
AuthorDate: Wed Apr 1 06:57:11 2026 +0800

    [improvement](fe) Migrate HMS client pool to Commons Pool (#61553)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    This PR migrates `ThriftHMSCachedClient` from a self-managed queue-based
    HMS client pool to `commons-pool2` while keeping the external FE
    interface unchanged.
    
    It also clarifies the responsibility boundary of the HMS pool:
    - The pool only manages FE-side client lifecycle, including create,
    borrow/return, invalidate on failure, and destroy on close.
    - Hive-side socket lifetime and reconnect remain handled by
    `RetryingMetaStoreClient`.
    - The pool does not interpret `hive.metastore.client.socket.lifetime` or
    probe remote socket health.
    
    Additional changes in this PR:
    - Preserve throwable-based invalidation so failed borrowers are not
    returned to the pool.
    - Fix nested borrow hazards in `updateTableStatistics` and lock polling
    paths under strict pool limits.
    - Add FE unit coverage for normal return, invalidation, blocking borrow
    reuse, close behavior, and single-client nested-call behavior.
---
 .../datasource/hive/ThriftHMSCachedClient.java     | 307 +++++++++++----
 .../datasource/hive/ThriftHMSCachedClientTest.java | 416 +++++++++++++++++++++
 2 files changed, 640 insertions(+), 83 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index 840ad765587..c58fe3bb58f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -31,6 +31,11 @@ import 
com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -69,12 +74,10 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -88,32 +91,41 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
     private static final HiveMetaHookLoader DUMMY_HOOK_LOADER = t -> null;
     // -1 means no limit on the partitions returned.
     private static final short MAX_LIST_PARTITION_NUM = 
Config.max_hive_list_partition_num;
+    private static final long CLIENT_POOL_BORROW_TIMEOUT_MS = 60_000L;
 
-    private Queue<ThriftHMSClient> clientPool = new LinkedList<>();
-    private boolean isClosed = false;
-    private final int poolSize;
+    private final GenericObjectPool<ThriftHMSClient> clientPool;
+    private volatile boolean isClosed = false;
     private final HiveConf hiveConf;
-    private ExecutionAuthenticator executionAuthenticator;
+    private final ExecutionAuthenticator executionAuthenticator;
+    private final MetaStoreClientProvider metaStoreClientProvider;
 
     public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize, 
ExecutionAuthenticator executionAuthenticator) {
-        Preconditions.checkArgument(poolSize > 0, poolSize);
+        this(hiveConf, poolSize, executionAuthenticator, new 
DefaultMetaStoreClientProvider());
+    }
+
+    ThriftHMSCachedClient(HiveConf hiveConf, int poolSize, 
ExecutionAuthenticator executionAuthenticator,
+            MetaStoreClientProvider metaStoreClientProvider) {
+        Preconditions.checkArgument(poolSize >= 0, poolSize);
         this.hiveConf = hiveConf;
-        this.poolSize = poolSize;
-        this.isClosed = false;
         this.executionAuthenticator = executionAuthenticator;
+        this.metaStoreClientProvider = 
Preconditions.checkNotNull(metaStoreClientProvider, "metaStoreClientProvider");
+        this.clientPool = poolSize == 0 ? null
+                : new GenericObjectPool<>(new ThriftHMSClientFactory(), 
createPoolConfig(poolSize));
     }
 
     @Override
     public void close() {
-        synchronized (clientPool) {
-            this.isClosed = true;
-            while (!clientPool.isEmpty()) {
-                try {
-                    clientPool.poll().close();
-                } catch (Exception e) {
-                    LOG.warn("failed to close thrift client", e);
-                }
-            }
+        if (isClosed) {
+            return;
+        }
+        isClosed = true;
+        if (clientPool == null) {
+            return;
+        }
+        try {
+            clientPool.close();
+        } catch (Exception e) {
+            LOG.warn("failed to close thrift client pool", e);
         }
     }
 
@@ -545,7 +557,7 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
                             "acquire lock timeout for txn " + txnId + " of 
query " + queryId + ", timeout(ms): "
                                     + timeoutMs);
                 }
-                response = checkLock(lockId);
+                response = checkLock(client, lockId);
             }
 
             if (response.getState() != LockState.ACQUIRED) {
@@ -599,15 +611,11 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         }
     }
 
-    private LockResponse checkLock(long lockId) {
-        try (ThriftHMSClient client = getClient()) {
-            try {
-                return ugiDoAs(() -> client.client.checkLock(lockId));
-            } catch (Exception e) {
-                client.setThrowable(e);
-                throw e;
-            }
+    private LockResponse checkLock(ThriftHMSClient client, long lockId) {
+        try {
+            return ugiDoAs(() -> client.client.checkLock(lockId));
         } catch (Exception e) {
+            client.setThrowable(e);
             throw new RuntimeException("failed to check lock " + lockId, e);
         }
     }
@@ -636,53 +644,155 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         return builder.build();
     }
 
+    /**
+     * The Doris HMS pool only manages client object lifecycle in FE:
+     * 1. Create clients.
+     * 2. Borrow and return clients.
+     * 3. Invalidate borrowers that have already failed.
+     * 4. Destroy clients when the pool is closed.
+     *
+     * The pool does not manage Hive-side socket lifetime or reconnect:
+     * 1. RetryingMetaStoreClient handles 
hive.metastore.client.socket.lifetime itself.
+     * 2. The pool does not interpret that config.
+     * 3. The pool does not probe remote socket health.
+     */
+    private GenericObjectPoolConfig createPoolConfig(int poolSize) {
+        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
+        config.setMaxTotal(poolSize);
+        config.setMaxIdle(poolSize);
+        config.setMinIdle(0);
+        config.setBlockWhenExhausted(true);
+        config.setMaxWaitMillis(CLIENT_POOL_BORROW_TIMEOUT_MS);
+        config.setTestOnBorrow(false);
+        config.setTestOnReturn(false);
+        config.setTestWhileIdle(false);
+        config.setTimeBetweenEvictionRunsMillis(-1L);
+        return config;
+    }
+
+    static String getMetastoreClientClassName(HiveConf hiveConf) {
+        String type = hiveConf.get(HMSBaseProperties.HIVE_METASTORE_TYPE);
+        if (HMSBaseProperties.DLF_TYPE.equalsIgnoreCase(type)) {
+            return ProxyMetaStoreClient.class.getName();
+        } else if (HMSBaseProperties.GLUE_TYPE.equalsIgnoreCase(type)) {
+            return AWSCatalogMetastoreClient.class.getName();
+        } else {
+            return HiveMetaStoreClient.class.getName();
+        }
+    }
+
+    private <T> T withSystemClassLoader(PrivilegedExceptionAction<T> action) 
throws Exception {
+        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
+            return action.run();
+        } finally {
+            Thread.currentThread().setContextClassLoader(classLoader);
+        }
+    }
+
+    private class ThriftHMSClientFactory extends 
BasePooledObjectFactory<ThriftHMSClient> {
+        @Override
+        public ThriftHMSClient create() throws Exception {
+            return createClient();
+        }
+
+        @Override
+        public PooledObject<ThriftHMSClient> wrap(ThriftHMSClient client) {
+            return new DefaultPooledObject<>(client);
+        }
+
+        @Override
+        public boolean validateObject(PooledObject<ThriftHMSClient> 
pooledObject) {
+            return !isClosed && pooledObject.getObject().isValid();
+        }
+
+        @Override
+        public void destroyObject(PooledObject<ThriftHMSClient> pooledObject) 
throws Exception {
+            pooledObject.getObject().destroy();
+        }
+    }
+
     private class ThriftHMSClient implements AutoCloseable {
         private final IMetaStoreClient client;
+        private volatile boolean destroyed;
         private volatile Throwable throwable;
 
-        private ThriftHMSClient(HiveConf hiveConf) throws MetaException {
-            String type = hiveConf.get(HMSBaseProperties.HIVE_METASTORE_TYPE);
-            if (HMSBaseProperties.DLF_TYPE.equalsIgnoreCase(type)) {
-                client = RetryingMetaStoreClient.getProxy(hiveConf, 
DUMMY_HOOK_LOADER,
-                        ProxyMetaStoreClient.class.getName());
-            } else if (HMSBaseProperties.GLUE_TYPE.equalsIgnoreCase(type)) {
-                client = RetryingMetaStoreClient.getProxy(hiveConf, 
DUMMY_HOOK_LOADER,
-                        AWSCatalogMetastoreClient.class.getName());
-            } else {
-                client = RetryingMetaStoreClient.getProxy(hiveConf, 
DUMMY_HOOK_LOADER,
-                        HiveMetaStoreClient.class.getName());
-            }
+        private ThriftHMSClient(IMetaStoreClient client) {
+            this.client = client;
         }
 
         public void setThrowable(Throwable throwable) {
             this.throwable = throwable;
         }
 
+        private boolean isValid() {
+            return !destroyed && throwable == null;
+        }
+
+        private void destroy() {
+            if (destroyed) {
+                return;
+            }
+            destroyed = true;
+            client.close();
+        }
+
         @Override
         public void close() throws Exception {
-            synchronized (clientPool) {
-                if (isClosed || throwable != null || clientPool.size() > 
poolSize) {
-                    client.close();
+            if (clientPool == null) {
+                destroy();
+                return;
+            }
+            if (isClosed) {
+                destroy();
+                return;
+            }
+            try {
+                if (throwable != null) {
+                    clientPool.invalidateObject(this);
                 } else {
-                    clientPool.offer(this);
+                    clientPool.returnObject(this);
                 }
+            } catch (IllegalStateException e) {
+                destroy();
+            } catch (Exception e) {
+                destroy();
+                throw e;
             }
         }
     }
 
     private ThriftHMSClient getClient() {
-        ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         try {
-            
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
-            synchronized (clientPool) {
-                ThriftHMSClient client = clientPool.poll();
-                if (client == null) {
-                    return ugiDoAs(() -> new ThriftHMSClient(hiveConf));
-                }
-                return client;
+            Preconditions.checkState(!isClosed, "HMS client pool is closed");
+            if (clientPool == null) {
+                return createClient();
             }
-        } finally {
-            Thread.currentThread().setContextClassLoader(classLoader);
+            return clientPool.borrowObject();
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new HMSClientException("failed to borrow hms client from 
pool", e);
+        }
+    }
+
+    private ThriftHMSClient createClient() throws Exception {
+        return withSystemClassLoader(() -> ugiDoAs(
+                () -> new 
ThriftHMSClient(metaStoreClientProvider.create(hiveConf))));
+    }
+
+    // Keep the HMS client creation behind an injectable seam so unit tests 
can verify
+    // Doris-side pool behavior without relying on Hive static construction 
internals.
+    interface MetaStoreClientProvider {
+        IMetaStoreClient create(HiveConf hiveConf) throws MetaException;
+    }
+
+    private static class DefaultMetaStoreClientProvider implements 
MetaStoreClientProvider {
+        @Override
+        public IMetaStoreClient create(HiveConf hiveConf) throws MetaException 
{
+            return RetryingMetaStoreClient.getProxy(hiveConf, 
DUMMY_HOOK_LOADER,
+                    getMetastoreClientClassName(hiveConf));
         }
     }
 
@@ -715,17 +825,24 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
             String tableName,
             Function<HivePartitionStatistics, HivePartitionStatistics> update) 
{
         try (ThriftHMSClient client = getClient()) {
-
-            Table originTable = getTable(dbName, tableName);
-            Map<String, String> originParams = originTable.getParameters();
-            HivePartitionStatistics updatedStats = 
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
-
-            Table newTable = originTable.deepCopy();
-            Map<String, String> newParams =
-                    HiveUtil.updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
-            newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
-            newTable.setParameters(newParams);
-            client.client.alter_table(dbName, tableName, newTable);
+            try {
+                Table originTable = ugiDoAs(() -> 
client.client.getTable(dbName, tableName));
+                Map<String, String> originParams = originTable.getParameters();
+                HivePartitionStatistics updatedStats = 
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
+
+                Table newTable = originTable.deepCopy();
+                Map<String, String> newParams =
+                        HiveUtil.updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
+                newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
+                newTable.setParameters(newParams);
+                ugiDoAs(() -> {
+                    client.client.alter_table(dbName, tableName, newTable);
+                    return null;
+                });
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException("failed to update table statistics for 
" + dbName + "." + tableName, e);
         }
@@ -738,22 +855,30 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
             String partitionName,
             Function<HivePartitionStatistics, HivePartitionStatistics> update) 
{
         try (ThriftHMSClient client = getClient()) {
-            List<Partition> partitions = client.client.getPartitionsByNames(
-                    dbName, tableName, ImmutableList.of(partitionName));
-            if (partitions.size() != 1) {
-                throw new RuntimeException("Metastore returned multiple 
partitions for name: " + partitionName);
-            }
+            try {
+                List<Partition> partitions = ugiDoAs(() -> 
client.client.getPartitionsByNames(
+                        dbName, tableName, ImmutableList.of(partitionName)));
+                if (partitions.size() != 1) {
+                    throw new RuntimeException("Metastore returned multiple 
partitions for name: " + partitionName);
+                }
 
-            Partition originPartition = partitions.get(0);
-            Map<String, String> originParams = originPartition.getParameters();
-            HivePartitionStatistics updatedStats = 
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
+                Partition originPartition = partitions.get(0);
+                Map<String, String> originParams = 
originPartition.getParameters();
+                HivePartitionStatistics updatedStats = 
update.apply(HiveUtil.toHivePartitionStatistics(originParams));
 
-            Partition modifiedPartition = originPartition.deepCopy();
-            Map<String, String> newParams =
-                    HiveUtil.updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
-            newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
-            modifiedPartition.setParameters(newParams);
-            client.client.alter_partition(dbName, tableName, 
modifiedPartition);
+                Partition modifiedPartition = originPartition.deepCopy();
+                Map<String, String> newParams =
+                        HiveUtil.updateStatisticsParameters(originParams, 
updatedStats.getCommonStatistics());
+                newParams.put("transient_lastDdlTime", 
String.valueOf(System.currentTimeMillis() / 1000));
+                modifiedPartition.setParameters(newParams);
+                ugiDoAs(() -> {
+                    client.client.alter_partition(dbName, tableName, 
modifiedPartition);
+                    return null;
+                });
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException("failed to update table statistics for 
" + dbName + "." + tableName, e);
         }
@@ -762,10 +887,18 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
     @Override
     public void addPartitions(String dbName, String tableName, 
List<HivePartitionWithStatistics> partitions) {
         try (ThriftHMSClient client = getClient()) {
-            List<Partition> hivePartitions = partitions.stream()
-                    .map(HiveUtil::toMetastoreApiPartition)
-                    .collect(Collectors.toList());
-            client.client.add_partitions(hivePartitions);
+            try {
+                List<Partition> hivePartitions = partitions.stream()
+                        .map(HiveUtil::toMetastoreApiPartition)
+                        .collect(Collectors.toList());
+                ugiDoAs(() -> {
+                    client.client.add_partitions(hivePartitions);
+                    return null;
+                });
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException("failed to add partitions for " + 
dbName + "." + tableName, e);
         }
@@ -774,7 +907,15 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
     @Override
     public void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData) {
         try (ThriftHMSClient client = getClient()) {
-            client.client.dropPartition(dbName, tableName, partitionValues, 
deleteData);
+            try {
+                ugiDoAs(() -> {
+                    client.client.dropPartition(dbName, tableName, 
partitionValues, deleteData);
+                    return null;
+                });
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException("failed to drop partition for " + 
dbName + "." + tableName, e);
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java
new file mode 100644
index 00000000000..46af20d72c9
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/ThriftHMSCachedClientTest.java
@@ -0,0 +1,416 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
+import org.apache.doris.datasource.NameMapping;
+import org.apache.doris.datasource.property.metastore.HMSBaseProperties;
+import org.apache.doris.info.TableNameInfo;
+
+import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
+import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Proxy;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ThriftHMSCachedClientTest {
+    private MockMetastoreClientProvider provider;
+
+    @Before
+    public void setUp() {
+        provider = new MockMetastoreClientProvider();
+    }
+
+    @Test
+    public void testPoolConfigKeepsBorrowValidationAndIdleEvictionDisabled() {
+        ThriftHMSCachedClient cachedClient = newClient(1);
+
+        GenericObjectPool<?> pool = getPool(cachedClient);
+        Assert.assertFalse(pool.getTestOnBorrow());
+        Assert.assertFalse(pool.getTestOnReturn());
+        Assert.assertFalse(pool.getTestWhileIdle());
+        Assert.assertEquals(60_000L, pool.getMaxWaitMillis());
+        Assert.assertEquals(-1L, pool.getTimeBetweenEvictionRunsMillis());
+    }
+
+    @Test
+    public void testPoolDisabledCreatesAndClosesClientPerBorrow() throws 
Exception {
+        ThriftHMSCachedClient cachedClient = newClient(0);
+
+        Assert.assertNull(getPool(cachedClient));
+
+        Object firstBorrowed = borrowClient(cachedClient);
+        closeBorrowed(firstBorrowed);
+        Assert.assertEquals(1, provider.createdClients.get());
+        Assert.assertEquals(1, provider.closedClients.get());
+
+        Object secondBorrowed = borrowClient(cachedClient);
+        Assert.assertNotSame(firstBorrowed, secondBorrowed);
+        closeBorrowed(secondBorrowed);
+        Assert.assertEquals(2, provider.createdClients.get());
+        Assert.assertEquals(2, provider.closedClients.get());
+    }
+
+    @Test
+    public void testReturnObjectToPool() throws Exception {
+        ThriftHMSCachedClient cachedClient = newClient(1);
+
+        Object firstBorrowed = borrowClient(cachedClient);
+        closeBorrowed(firstBorrowed);
+
+        Assert.assertEquals(1, getPool(cachedClient).getNumIdle());
+        Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+        Assert.assertEquals(1, provider.createdClients.get());
+        Assert.assertEquals(0, provider.closedClients.get());
+
+        Object secondBorrowed = borrowClient(cachedClient);
+        Assert.assertSame(firstBorrowed, secondBorrowed);
+        closeBorrowed(secondBorrowed);
+    }
+
+    @Test
+    public void testInvalidateBrokenObject() throws Exception {
+        ThriftHMSCachedClient cachedClient = newClient(1);
+
+        Object brokenBorrowed = borrowClient(cachedClient);
+        markBorrowedBroken(brokenBorrowed, new RuntimeException("broken"));
+        closeBorrowed(brokenBorrowed);
+
+        Assert.assertEquals(0, getPool(cachedClient).getNumIdle());
+        Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+        Assert.assertEquals(1, provider.createdClients.get());
+        Assert.assertEquals(1, provider.closedClients.get());
+
+        Object nextBorrowed = borrowClient(cachedClient);
+        Assert.assertNotSame(brokenBorrowed, nextBorrowed);
+        Assert.assertEquals(2, provider.createdClients.get());
+        closeBorrowed(nextBorrowed);
+    }
+
+    @Test
+    public void testBorrowBlocksUntilObjectReturned() throws Exception {
+        ThriftHMSCachedClient cachedClient = newClient(1);
+        Object firstBorrowed = borrowClient(cachedClient);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        try {
+            Future<Object> waitingBorrow = executor.submit(() -> 
borrowClient(cachedClient));
+            Thread.sleep(200L);
+            Assert.assertFalse(waitingBorrow.isDone());
+
+            closeBorrowed(firstBorrowed);
+
+            Object secondBorrowed = waitingBorrow.get(2, TimeUnit.SECONDS);
+            Assert.assertSame(firstBorrowed, secondBorrowed);
+            closeBorrowed(secondBorrowed);
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testCloseDestroysIdleObjectsAndRejectsBorrow() throws 
Exception {
+        ThriftHMSCachedClient cachedClient = newClient(1);
+        Object borrowed = borrowClient(cachedClient);
+        closeBorrowed(borrowed);
+
+        cachedClient.close();
+
+        Assert.assertTrue(getPool(cachedClient).isClosed());
+        Assert.assertEquals(1, provider.closedClients.get());
+        Assert.assertThrows(IllegalStateException.class, () -> 
borrowClient(cachedClient));
+    }
+
+    @Test
+    public void testCloseWhileObjectBorrowedClosesClientOnReturn() throws 
Exception {
+        ThriftHMSCachedClient cachedClient = newClient(1);
+        Object borrowed = borrowClient(cachedClient);
+
+        cachedClient.close();
+        Assert.assertEquals(0, provider.closedClients.get());
+
+        closeBorrowed(borrowed);
+
+        Assert.assertEquals(1, provider.closedClients.get());
+        Assert.assertEquals(0, getPool(cachedClient).getNumIdle());
+    }
+
+    @Test
+    public void testGetMetastoreClientClassName() {
+        HiveConf hiveConf = new HiveConf();
+        Assert.assertEquals(HiveMetaStoreClient.class.getName(),
+                ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf));
+
+        hiveConf.set(HMSBaseProperties.HIVE_METASTORE_TYPE, 
HMSBaseProperties.GLUE_TYPE);
+        Assert.assertEquals(AWSCatalogMetastoreClient.class.getName(),
+                ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf));
+
+        hiveConf.set(HMSBaseProperties.HIVE_METASTORE_TYPE, 
HMSBaseProperties.DLF_TYPE);
+        Assert.assertEquals(ProxyMetaStoreClient.class.getName(),
+                ThriftHMSCachedClient.getMetastoreClientClassName(hiveConf));
+    }
+
+    @Test
+    public void testUpdateTableStatisticsDoesNotBorrowSecondClient() {
+        ThriftHMSCachedClient cachedClient = newClient(1);
+
+        cachedClient.updateTableStatistics("db1", "tbl1", statistics -> 
statistics);
+
+        Assert.assertEquals(1, provider.createdClients.get());
+        Assert.assertEquals(1, getPool(cachedClient).getNumIdle());
+        Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+    }
+
+    @Test
+    public void testAcquireSharedLockDoesNotBorrowSecondClient() {
+        provider.lockStates.add(LockState.WAITING);
+        provider.lockStates.add(LockState.ACQUIRED);
+        ThriftHMSCachedClient cachedClient = newClient(1);
+
+        cachedClient.acquireSharedLock("query-1", 1L, "user",
+                new TableNameInfo("db1", "tbl1"), Collections.emptyList(), 
5_000L);
+
+        Assert.assertEquals(1, provider.createdClients.get());
+        Assert.assertEquals(1, provider.checkLockCalls.get());
+        Assert.assertEquals(1, getPool(cachedClient).getNumIdle());
+        Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+    }
+
+    @Test
+    public void testUpdatePartitionStatisticsInvalidatesFailedClient() throws 
Exception {
+        provider.alterPartitionFailure = new RuntimeException("alter partition 
failed");
+        ThriftHMSCachedClient cachedClient = newClient(1);
+
+        RuntimeException exception = 
Assert.assertThrows(RuntimeException.class,
+                () -> cachedClient.updatePartitionStatistics("db1", "tbl1", 
"p1", statistics -> statistics));
+        Assert.assertTrue(exception.getMessage().contains("failed to update 
table statistics"));
+        assertBrokenBorrowerIsNotReused(cachedClient);
+    }
+
+    @Test
+    public void testAddPartitionsInvalidatesFailedClient() throws Exception {
+        provider.addPartitionsFailure = new RuntimeException("add partitions 
failed");
+        ThriftHMSCachedClient cachedClient = newClient(1);
+
+        RuntimeException exception = 
Assert.assertThrows(RuntimeException.class,
+                () -> cachedClient.addPartitions("db1", "tbl1", 
Collections.singletonList(newPartitionWithStatistics())));
+        Assert.assertTrue(exception.getMessage().contains("failed to add 
partitions"));
+        assertBrokenBorrowerIsNotReused(cachedClient);
+    }
+
+    @Test
+    public void testDropPartitionInvalidatesFailedClient() throws Exception {
+        provider.dropPartitionFailure = new RuntimeException("drop partition 
failed");
+        ThriftHMSCachedClient cachedClient = newClient(1);
+
+        RuntimeException exception = 
Assert.assertThrows(RuntimeException.class,
+                () -> cachedClient.dropPartition("db1", "tbl1", 
Collections.singletonList("p1"), false));
+        Assert.assertTrue(exception.getMessage().contains("failed to drop 
partition"));
+        assertBrokenBorrowerIsNotReused(cachedClient);
+    }
+
+    private void assertBrokenBorrowerIsNotReused(ThriftHMSCachedClient 
cachedClient) throws Exception {
+        Assert.assertEquals(0, getPool(cachedClient).getNumIdle());
+        Assert.assertEquals(0, getPool(cachedClient).getNumActive());
+        Assert.assertEquals(1, provider.createdClients.get());
+        Assert.assertEquals(1, provider.closedClients.get());
+
+        Object nextBorrowed = borrowClient(cachedClient);
+        Assert.assertEquals(2, provider.createdClients.get());
+        closeBorrowed(nextBorrowed);
+    }
+
+    private ThriftHMSCachedClient newClient(int poolSize) {
+        return newClient(new HiveConf(), poolSize);
+    }
+
+    private ThriftHMSCachedClient newClient(HiveConf hiveConf, int poolSize) {
+        return new ThriftHMSCachedClient(hiveConf, poolSize, new 
ExecutionAuthenticator() {
+        }, provider);
+    }
+
+    private GenericObjectPool<?> getPool(ThriftHMSCachedClient cachedClient) {
+        return Deencapsulation.getField(cachedClient, "clientPool");
+    }
+
+    private Object borrowClient(ThriftHMSCachedClient cachedClient) {
+        return Deencapsulation.invoke(cachedClient, "getClient");
+    }
+
+    private void markBorrowedBroken(Object borrowedClient, Throwable 
throwable) {
+        Deencapsulation.invoke(borrowedClient, "setThrowable", throwable);
+    }
+
+    private void closeBorrowed(Object borrowedClient) throws Exception {
+        ((AutoCloseable) borrowedClient).close();
+    }
+
+    private HivePartitionWithStatistics newPartitionWithStatistics() {
+        HivePartition partition = new HivePartition(
+                NameMapping.createForTest("db1", "tbl1"),
+                false,
+                "input-format",
+                "file:///tmp/part",
+                Collections.singletonList("p1"),
+                new HashMap<>(),
+                "output-format",
+                "serde",
+                Collections.singletonList(new FieldSchema("c1", "string", 
"")));
+        return new HivePartitionWithStatistics("k1=v1", partition, 
HivePartitionStatistics.EMPTY);
+    }
+
+    private static class MockMetastoreClientProvider implements 
ThriftHMSCachedClient.MetaStoreClientProvider {
+        private final AtomicInteger createdClients = new AtomicInteger();
+        private final AtomicInteger closedClients = new AtomicInteger();
+        private final AtomicInteger checkLockCalls = new AtomicInteger();
+        private final Deque<LockState> lockStates = new ArrayDeque<>();
+
+        private volatile RuntimeException alterPartitionFailure;
+        private volatile RuntimeException addPartitionsFailure;
+        private volatile RuntimeException dropPartitionFailure;
+
+        @Override
+        public IMetaStoreClient create(HiveConf hiveConf) {
+            createdClients.incrementAndGet();
+            return (IMetaStoreClient) Proxy.newProxyInstance(
+                    IMetaStoreClient.class.getClassLoader(),
+                    new Class[] {IMetaStoreClient.class},
+                    (proxy, method, args) -> handleMethod(proxy, 
method.getName(), args, method.getReturnType()));
+        }
+
+        private Object handleMethod(Object proxy, String methodName, Object[] 
args, Class<?> returnType) {
+            if ("close".equals(methodName)) {
+                closedClients.incrementAndGet();
+                return null;
+            }
+            if ("hashCode".equals(methodName)) {
+                return System.identityHashCode(proxy);
+            }
+            if ("equals".equals(methodName)) {
+                return proxy == args[0];
+            }
+            if ("toString".equals(methodName)) {
+                return "MockHmsClient";
+            }
+            if ("getTable".equals(methodName)) {
+                Table table = new Table();
+                table.setParameters(new HashMap<>());
+                return table;
+            }
+            if ("getPartitionsByNames".equals(methodName)) {
+                Partition partition = new Partition();
+                partition.setParameters(new HashMap<>());
+                return Collections.singletonList(partition);
+            }
+            if ("alter_partition".equals(methodName)) {
+                if (alterPartitionFailure != null) {
+                    throw alterPartitionFailure;
+                }
+                return null;
+            }
+            if ("add_partitions".equals(methodName)) {
+                if (addPartitionsFailure != null) {
+                    throw addPartitionsFailure;
+                }
+                return 1;
+            }
+            if ("dropPartition".equals(methodName)) {
+                if (dropPartitionFailure != null) {
+                    throw dropPartitionFailure;
+                }
+                return true;
+            }
+            if ("lock".equals(methodName)) {
+                return newLockResponse(nextLockState());
+            }
+            if ("checkLock".equals(methodName)) {
+                checkLockCalls.incrementAndGet();
+                return newLockResponse(nextLockState());
+            }
+            return defaultValue(returnType);
+        }
+
+        private LockState nextLockState() {
+            synchronized (lockStates) {
+                if (lockStates.isEmpty()) {
+                    return LockState.ACQUIRED;
+                }
+                return lockStates.removeFirst();
+            }
+        }
+
+        private LockResponse newLockResponse(LockState state) {
+            LockResponse response = new LockResponse();
+            response.setLockid(1L);
+            response.setState(state);
+            return response;
+        }
+
+        private Object defaultValue(Class<?> returnType) {
+            if (!returnType.isPrimitive()) {
+                return null;
+            }
+            if (returnType == boolean.class) {
+                return false;
+            }
+            if (returnType == byte.class) {
+                return (byte) 0;
+            }
+            if (returnType == short.class) {
+                return (short) 0;
+            }
+            if (returnType == int.class) {
+                return 0;
+            }
+            if (returnType == long.class) {
+                return 0L;
+            }
+            if (returnType == float.class) {
+                return 0F;
+            }
+            if (returnType == double.class) {
+                return 0D;
+            }
+            if (returnType == char.class) {
+                return '\0';
+            }
+            return null;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to