This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 965412b829 [hive] Lazily initialize HMS client pool in HiveCatalog 
(#7346)
965412b829 is described below

commit 965412b829836ec52b56d99368b0bb5a74d182c4
Author: WenjunMin <[email protected]>
AuthorDate: Sat Mar 7 11:00:32 2026 +0800

    [hive] Lazily initialize HMS client pool in HiveCatalog (#7346)
---
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 254 ++++++++++++---------
 .../paimon/hive/pool/TestCachedClientPool.java     |  15 +-
 2 files changed, 152 insertions(+), 117 deletions(-)

diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 38ecd9372d..ac0706b40b 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -157,7 +157,7 @@ public class HiveCatalog extends AbstractCatalog {
     private final HiveConf hiveConf;
     private final String clientClassName;
     private final Options options;
-    private final ClientPool<IMetaStoreClient, TException> clients;
+    private volatile ClientPool<IMetaStoreClient, TException> clients;
     private final String warehouse;
 
     private final LocationHelper locationHelper;
@@ -188,8 +188,20 @@ public class HiveCatalog extends AbstractCatalog {
             hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, 
warehouse);
             locationHelper = new StorageLocationHelper();
         }
+    }
 
-        this.clients = new CachedClientPool(hiveConf, options, 
clientClassName);
+    private ClientPool<IMetaStoreClient, TException> clients() {
+        ClientPool<IMetaStoreClient, TException> local = clients;
+        if (local == null) {
+            synchronized (this) {
+                local = clients;
+                if (local == null) {
+                    local = new CachedClientPool(hiveConf, options, 
clientClassName);
+                    clients = local;
+                }
+            }
+        }
+        return local;
     }
 
     private boolean formatTableDisabled() {
@@ -246,25 +258,28 @@ public class HiveCatalog extends AbstractCatalog {
             String databaseName = identifier.getDatabaseName();
             String tableName = identifier.getTableName();
             Optional<Path> tablePath =
-                    clients.run(
-                            client -> {
-                                if (table != null) {
-                                    String location = 
locationHelper.getTableLocation(table);
-                                    if (location != null) {
-                                        return Optional.of(new Path(location));
-                                    }
-                                } else {
-                                    // If the table does not exist,
-                                    // we should use the database path to 
generate the table path.
-                                    String dbLocation =
-                                            locationHelper.getDatabaseLocation(
-                                                    
client.getDatabase(databaseName));
-                                    if (dbLocation != null) {
-                                        return Optional.of(new 
Path(dbLocation, tableName));
-                                    }
-                                }
-                                return Optional.empty();
-                            });
+                    clients()
+                            .run(
+                                    client -> {
+                                        if (table != null) {
+                                            String location =
+                                                    
locationHelper.getTableLocation(table);
+                                            if (location != null) {
+                                                return Optional.of(new 
Path(location));
+                                            }
+                                        } else {
+                                            // If the table does not exist,
+                                            // we should use the database path 
to generate the table
+                                            // path.
+                                            String dbLocation =
+                                                    
locationHelper.getDatabaseLocation(
+                                                            
client.getDatabase(databaseName));
+                                            if (dbLocation != null) {
+                                                return Optional.of(new 
Path(dbLocation, tableName));
+                                            }
+                                        }
+                                        return Optional.empty();
+                                    });
             return tablePath.orElse(super.getTableLocation(identifier));
         } catch (TException e) {
             throw new RuntimeException("Can not get table " + identifier + " 
from metastore.", e);
@@ -278,7 +293,7 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     public List<String> listDatabases() {
         try {
-            return clients.run(IMetaStoreClient::getAllDatabases);
+            return clients().run(IMetaStoreClient::getAllDatabases);
         } catch (TException e) {
             throw new RuntimeException("Failed to list all databases", e);
         } catch (InterruptedException e) {
@@ -297,7 +312,7 @@ public class HiveCatalog extends AbstractCatalog {
                             : new Path(database.getLocationUri());
             locationHelper.createPathIfRequired(databasePath, 
fileIO(databasePath));
             locationHelper.specifyDatabaseLocation(databasePath, database);
-            clients.execute(client -> client.createDatabase(database));
+            clients().execute(client -> client.createDatabase(database));
         } catch (TException | IOException e) {
             throw new RuntimeException("Failed to create database " + name, e);
         } catch (InterruptedException e) {
@@ -330,7 +345,7 @@ public class HiveCatalog extends AbstractCatalog {
     public org.apache.paimon.catalog.Database getDatabaseImpl(String name)
             throws DatabaseNotExistException {
         try {
-            Database database = clients.run(client -> 
client.getDatabase(name));
+            Database database = clients().run(client -> 
client.getDatabase(name));
             Map<String, String> options = new 
HashMap<>(database.getParameters());
             if (database.getDescription() != null) {
                 options.put(COMMENT_PROP, database.getDescription());
@@ -416,7 +431,7 @@ public class HiveCatalog extends AbstractCatalog {
             hivePartitions.add(hivePartition);
         }
         try {
-            clients.execute(client -> client.add_partitions(hivePartitions, 
true, false));
+            clients().execute(client -> client.add_partitions(hivePartitions, 
true, false));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -436,13 +451,14 @@ public class HiveCatalog extends AbstractCatalog {
             for (Map<String, String> part : metaPartitions) {
                 List<String> partitionValues = new ArrayList<>(part.values());
                 try {
-                    clients.execute(
-                            client ->
-                                    client.dropPartition(
-                                            identifier.getDatabaseName(),
-                                            identifier.getTableName(),
-                                            partitionValues,
-                                            false));
+                    clients()
+                            .execute(
+                                    client ->
+                                            client.dropPartition(
+                                                    
identifier.getDatabaseName(),
+                                                    identifier.getTableName(),
+                                                    partitionValues,
+                                                    false));
                 } catch (NoSuchObjectException e) {
                     // do nothing if the partition not exists
                 } catch (Exception e) {
@@ -500,22 +516,24 @@ public class HiveCatalog extends AbstractCatalog {
 
                 try {
                     Partition hivePartition =
-                            clients.run(
-                                    client ->
-                                            client.getPartition(
-                                                    
identifier.getDatabaseName(),
-                                                    identifier.getObjectName(),
-                                                    partitionValues));
+                            clients()
+                                    .run(
+                                            client ->
+                                                    client.getPartition(
+                                                            
identifier.getDatabaseName(),
+                                                            
identifier.getObjectName(),
+                                                            partitionValues));
                     hivePartition.setValues(partitionValues);
                     hivePartition.setLastAccessTime(
                             (int) (partition.lastFileCreationTime() / 1000));
                     hivePartition.getParameters().putAll(statistic);
-                    clients.execute(
-                            client ->
-                                    client.alter_partition(
-                                            identifier.getDatabaseName(),
-                                            identifier.getObjectName(),
-                                            hivePartition));
+                    clients()
+                            .execute(
+                                    client ->
+                                            client.alter_partition(
+                                                    
identifier.getDatabaseName(),
+                                                    identifier.getObjectName(),
+                                                    hivePartition));
                 } catch (NoSuchObjectException e) {
                     // do nothing if the partition not exists
                 } catch (Exception e) {
@@ -529,16 +547,17 @@ public class HiveCatalog extends AbstractCatalog {
     public void markDonePartitions(Identifier identifier, List<Map<String, 
String>> partitions)
             throws TableNotExistException {
         try {
-            clients.execute(
-                    client -> {
-                        for (Map<String, String> partition : partitions) {
-                            client.markPartitionForEvent(
-                                    identifier.getDatabaseName(),
-                                    identifier.getTableName(),
-                                    partition,
-                                    PartitionEventType.LOAD_DONE);
-                        }
-                    });
+            clients()
+                    .execute(
+                            client -> {
+                                for (Map<String, String> partition : 
partitions) {
+                                    client.markPartitionForEvent(
+                                            identifier.getDatabaseName(),
+                                            identifier.getTableName(),
+                                            partition,
+                                            PartitionEventType.LOAD_DONE);
+                                }
+                            });
         } catch (NoSuchObjectException e) {
             // do nothing if the partition not exists
         } catch (UnknownTableException e) {
@@ -598,12 +617,13 @@ public class HiveCatalog extends AbstractCatalog {
     @VisibleForTesting
     public List<Partition> listPartitionsFromHms(Identifier identifier)
             throws TException, InterruptedException {
-        return clients.run(
-                client ->
-                        client.listPartitions(
-                                identifier.getDatabaseName(),
-                                identifier.getTableName(),
-                                Short.MAX_VALUE));
+        return clients()
+                .run(
+                        client ->
+                                client.listPartitions(
+                                        identifier.getDatabaseName(),
+                                        identifier.getTableName(),
+                                        Short.MAX_VALUE));
     }
 
     private List<Map<String, String>> removePartitionsExistsInOtherBranches(
@@ -647,10 +667,10 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     protected void dropDatabaseImpl(String name) {
         try {
-            Database database = clients.run(client -> 
client.getDatabase(name));
+            Database database = clients().run(client -> 
client.getDatabase(name));
             Path location = new 
Path(locationHelper.getDatabaseLocation(database));
             locationHelper.dropPathIfRequired(location, fileIO(location));
-            clients.execute(client -> client.dropDatabase(name, true, false, 
true));
+            clients().execute(client -> client.dropDatabase(name, true, false, 
true));
         } catch (TException | IOException e) {
             throw new RuntimeException("Failed to drop database " + name, e);
         } catch (InterruptedException e) {
@@ -662,7 +682,7 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     protected void alterDatabaseImpl(String name, List<PropertyChange> 
changes) {
         try {
-            Database database = clients.run(client -> 
client.getDatabase(name));
+            Database database = clients().run(client -> 
client.getDatabase(name));
             Map<String, String> parameter = new 
HashMap<>(database.getParameters());
             Pair<Map<String, String>, Set<String>> setPropertiesToRemoveKeys =
                     PropertyChange.getSetPropertiesToRemoveKeys(changes);
@@ -675,7 +695,7 @@ public class HiveCatalog extends AbstractCatalog {
                 parameter.keySet().removeAll(removeKeys);
             }
             Database alterDatabase = convertToHiveDatabase(name, parameter);
-            clients.execute(client -> client.alterDatabase(name, 
alterDatabase));
+            clients().execute(client -> client.alterDatabase(name, 
alterDatabase));
         } catch (TException e) {
             throw new RuntimeException("Failed to alter database " + name, e);
         } catch (InterruptedException e) {
@@ -687,14 +707,14 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     protected List<String> listTablesImpl(String databaseName) {
         try {
-            List<String> tableNames = clients.run(client -> 
client.getAllTables(databaseName));
+            List<String> tableNames = clients().run(client -> 
client.getAllTables(databaseName));
             int batchSize = getBatchGetTableSize();
             List<Table> hmsTables =
                     Lists.partition(tableNames, batchSize).stream()
                             .flatMap(
                                     batchTableNames -> {
                                         try {
-                                            return clients
+                                            return clients()
                                                     .run(
                                                             client ->
                                                                     
client.getTableObjectsByName(
@@ -851,7 +871,7 @@ public class HiveCatalog extends AbstractCatalog {
         sd.setCols(columns);
 
         try {
-            clients.execute(client -> client.createTable(hiveTable));
+            clients().execute(client -> client.createTable(hiveTable));
         } catch (Exception e) {
             // we don't need to delete directories since HMS will roll back db 
and fs if failed.
             throw new RuntimeException("Failed to create table " + 
identifier.getFullName(), e);
@@ -871,14 +891,15 @@ public class HiveCatalog extends AbstractCatalog {
         }
 
         try {
-            clients.execute(
-                    client ->
-                            client.dropTable(
-                                    identifier.getDatabaseName(),
-                                    identifier.getTableName(),
-                                    false,
-                                    false,
-                                    false));
+            clients()
+                    .execute(
+                            client ->
+                                    client.dropTable(
+                                            identifier.getDatabaseName(),
+                                            identifier.getTableName(),
+                                            false,
+                                            false,
+                                            false));
         } catch (TException e) {
             throw new RuntimeException("Failed to drop view " + 
identifier.getFullName(), e);
         } catch (InterruptedException e) {
@@ -896,8 +917,8 @@ public class HiveCatalog extends AbstractCatalog {
         getDatabase(databaseName);
 
         try {
-            return clients.run(
-                    client -> client.getTables(databaseName, "*", 
TableType.VIRTUAL_VIEW));
+            return clients()
+                    .run(client -> client.getTables(databaseName, "*", 
TableType.VIRTUAL_VIEW));
         } catch (TException e) {
             throw new RuntimeException("Failed to list views in database " + 
databaseName, e);
         } catch (InterruptedException e) {
@@ -994,7 +1015,7 @@ public class HiveCatalog extends AbstractCatalog {
             Path location = pair.getLeft();
             boolean externalTable = pair.getRight();
             Table hiveTable = createHiveFormatTable(identifier, newSchema, 
location, externalTable);
-            clients.execute(client -> client.createTable(hiveTable));
+            clients().execute(client -> client.createTable(hiveTable));
         } catch (Exception e) {
             // we don't need to delete directories since HMS will roll back db 
and fs if failed.
             throw new RuntimeException("Failed to create table " + 
identifier.getFullName(), e);
@@ -1019,14 +1040,15 @@ public class HiveCatalog extends AbstractCatalog {
     protected void dropTableImpl(Identifier identifier, List<Path> 
externalPaths) {
         try {
             boolean externalTable = isExternalTable(getHmsTable(identifier));
-            clients.execute(
-                    client ->
-                            client.dropTable(
-                                    identifier.getDatabaseName(),
-                                    identifier.getTableName(),
-                                    !externalTable,
-                                    false,
-                                    true));
+            clients()
+                    .execute(
+                            client ->
+                                    client.dropTable(
+                                            identifier.getDatabaseName(),
+                                            identifier.getTableName(),
+                                            !externalTable,
+                                            false,
+                                            true));
 
             // When drop a Hive external table, only the hive metadata is 
deleted and the data files
             // are not deleted.
@@ -1064,11 +1086,12 @@ public class HiveCatalog extends AbstractCatalog {
     protected void createTableImpl(Identifier identifier, Schema schema) {
         try {
             boolean tableExists =
-                    clients.run(
-                            (client ->
-                                    client.tableExists(
-                                            identifier.getDatabaseName(),
-                                            identifier.getTableName())));
+                    clients()
+                            .run(
+                                    (client ->
+                                            client.tableExists(
+                                                    
identifier.getDatabaseName(),
+                                                    
identifier.getTableName())));
             if (tableExists) {
                 throw new RuntimeException(
                         "Table "
@@ -1097,11 +1120,15 @@ public class HiveCatalog extends AbstractCatalog {
         }
 
         try {
-            clients.execute(
-                    client ->
-                            client.createTable(
-                                    createHiveTable(
-                                            identifier, tableSchema, location, 
externalTable)));
+            clients()
+                    .execute(
+                            client ->
+                                    client.createTable(
+                                            createHiveTable(
+                                                    identifier,
+                                                    tableSchema,
+                                                    location,
+                                                    externalTable)));
         } catch (Exception e) {
             try {
                 if (!externalTable) {
@@ -1185,10 +1212,13 @@ public class HiveCatalog extends AbstractCatalog {
 
                 // update location
                 locationHelper.specifyTableLocation(table, toPath.toString());
-                clients.execute(
-                        client ->
-                                client.alter_table(
-                                        toTable.getDatabaseName(), 
toTable.getTableName(), table));
+                clients()
+                        .execute(
+                                client ->
+                                        client.alter_table(
+                                                toTable.getDatabaseName(),
+                                                toTable.getTableName(),
+                                                table));
             }
         } catch (TException e) {
             throw new RuntimeException("Failed to rename table " + 
fromTable.getFullName(), e);
@@ -1202,10 +1232,10 @@ public class HiveCatalog extends AbstractCatalog {
         try {
             String fromDB = fromTable.getDatabaseName();
             String fromTableName = fromTable.getTableName();
-            Table table = clients.run(client -> client.getTable(fromDB, 
fromTableName));
+            Table table = clients().run(client -> client.getTable(fromDB, 
fromTableName));
             table.setDbName(toTable.getDatabaseName());
             table.setTableName(toTable.getTableName());
-            clients.execute(client -> client.alter_table(fromDB, 
fromTableName, table));
+            clients().execute(client -> client.alter_table(fromDB, 
fromTableName, table));
 
             return table;
         } catch (TException e) {
@@ -1263,7 +1293,7 @@ public class HiveCatalog extends AbstractCatalog {
         Path location = getTableLocation(identifier, table);
         // file format is null, because only data table support alter table.
         updateHmsTable(table, identifier, newSchema, null, location);
-        clients.execute(client -> HiveAlterTableUtils.alterTable(client, 
identifier, table));
+        clients().execute(client -> HiveAlterTableUtils.alterTable(client, 
identifier, table));
     }
 
     @Override
@@ -1307,7 +1337,7 @@ public class HiveCatalog extends AbstractCatalog {
         // tables from file system
         List<String> tables;
         try {
-            Database database = clients.run(client -> 
client.getDatabase(databaseName));
+            Database database = clients().run(client -> 
client.getDatabase(databaseName));
             tables = listTablesInFileSystem(new 
Path(locationHelper.getDatabaseLocation(database)));
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -1357,7 +1387,7 @@ public class HiveCatalog extends AbstractCatalog {
                                     usingExternalTable(tableSchema.options()));
                 }
                 Table finalNewTable = newTable;
-                clients.execute(client -> client.createTable(finalNewTable));
+                clients().execute(client -> client.createTable(finalNewTable));
             }
 
             // repair partitions
@@ -1400,10 +1430,12 @@ public class HiveCatalog extends AbstractCatalog {
     public Table getHmsTable(Identifier identifier)
             throws TableNotExistException, TableNoPermissionException {
         try {
-            return clients.run(
-                    client ->
-                            client.getTable(
-                                    identifier.getDatabaseName(), 
identifier.getTableName()));
+            return clients()
+                    .run(
+                            client ->
+                                    client.getTable(
+                                            identifier.getDatabaseName(),
+                                            identifier.getTableName()));
         } catch (NoSuchObjectException e) {
             throw new TableNotExistException(identifier);
         } catch (TException e) {
@@ -1667,7 +1699,7 @@ public class HiveCatalog extends AbstractCatalog {
     @VisibleForTesting
     public IMetaStoreClient getHmsClient() {
         try {
-            return clients.run(client -> client);
+            return clients().run(client -> client);
         } catch (Exception e) {
             throw new RuntimeException("Failed to close hms client:", e);
         }
@@ -1691,7 +1723,7 @@ public class HiveCatalog extends AbstractCatalog {
 
         HiveCatalogLock lock =
                 new HiveCatalogLock(
-                        clients,
+                        clients(),
                         HiveCatalogLock.checkMaxSleep(hiveConf),
                         HiveCatalogLock.acquireTimeout(hiveConf));
         return Lock.fromCatalog(lock, identifier).runWithLock(callable);
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
index 6108633398..596ccf5c19 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/pool/TestCachedClientPool.java
@@ -417,12 +417,15 @@ public class TestCachedClientPool {
                                                 String metastoreClientClass =
                                                         
"org.apache.hadoop.hive.metastore.HiveMetaStoreClient";
 
-                                                return new HiveCatalog(
-                                                        fileIO,
-                                                        hiveConf,
-                                                        metastoreClientClass,
-                                                        
CatalogContext.create(options),
-                                                        warehouse);
+                                                HiveCatalog hCata =
+                                                        new HiveCatalog(
+                                                                fileIO,
+                                                                hiveConf,
+                                                                
metastoreClientClass,
+                                                                
CatalogContext.create(options),
+                                                                warehouse);
+                                                hCata.listDatabases();
+                                                return hCata;
                                             } catch (Exception e) {
                                                 throw new RuntimeException(e);
                                             }

Reply via email to