This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dd5358c419 [HUDI-9365] Reduce overhead of Hive and AWS Glue sync 
tools (#13249)
9dd5358c419 is described below

commit 9dd5358c4193c3480d7b95f6e4961566a0706238
Author: Tim Brown <[email protected]>
AuthorDate: Thu May 29 20:53:02 2025 -0500

    [HUDI-9365] Reduce overhead of Hive and AWS Glue sync tools (#13249)
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    | 29 ++++++++++--------
 .../java/org/apache/hudi/hive/HiveSyncTool.java    | 35 ++++++++++++++++++----
 .../org/apache/hudi/hive/HoodieHiveSyncClient.java | 23 +++++++++-----
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |  7 +++--
 .../apache/hudi/sync/common/HoodieSyncClient.java  |  6 ++--
 5 files changed, 69 insertions(+), 31 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index dccd0b4e0d2..f0579cdc1ed 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -23,7 +23,6 @@ import 
org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -143,11 +142,12 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   private static final String ENABLE_MDT_LISTING = 
"hudi.metadata-listing-enabled";
   private final String databaseName;
 
-  private final Boolean skipTableArchive;
+  private final boolean skipTableArchive;
   private final String enableMetadataTable;
   private final int allPartitionsReadParallelism;
   private final int changedPartitionsReadParallelism;
   private final int changeParallelism;
+  private final Map<String, Table> initialTableByName = new HashMap<>();
 
   public AWSGlueCatalogSyncClient(HiveSyncConfig config, HoodieTableMetaClient 
metaClient) {
     this(buildAsyncClient(config), config, metaClient);
@@ -201,6 +201,10 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     }
   }
 
+  private Table getInitialTable(String tableName) {
+    return initialTableByName.computeIfAbsent(tableName, t -> 
getTable(awsGlue, databaseName, t));
+  }
+
   @Override
   public List<Partition> getAllPartitions(String tableName) {
     ExecutorService executorService = 
Executors.newFixedThreadPool(this.allPartitionsReadParallelism, new 
CustomizedThreadFactory("glue-sync-all-partitions", true));
@@ -447,7 +451,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
 
   private String getTableDoc() {
     try {
-      return new 
TableSchemaResolver(metaClient).getTableAvroSchema(true).getDoc();
+      return tableSchemaResolver.getTableAvroSchema(true).getDoc();
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Failed to get schema's doc from 
storage : ", e);
     }
@@ -456,7 +460,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   @Override
   public List<FieldSchema> getStorageFieldSchemas() {
     try {
-      return new TableSchemaResolver(metaClient).getTableAvroSchema(true)
+      return tableSchemaResolver.getTableAvroSchema(true)
           .getFields()
           .stream()
           .map(f -> new FieldSchema(f.name(), f.schema().getType().getName(), 
f.doc()))
@@ -790,7 +794,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
     try {
       // GlueMetastoreClient returns partition keys separate from Columns, 
hence get both and merge to
       // get the Schema of the table.
-      Table table = getTable(awsGlue, databaseName, tableName);
+      Table table = getInitialTable(tableName);
       Map<String, String> partitionKeysMap =
           
table.partitionKeys().stream().collect(Collectors.toMap(Column::name, f -> 
f.type().toUpperCase()));
 
@@ -813,7 +817,11 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
         .name(tableName)
         .build();
     try {
-      return Objects.nonNull(awsGlue.getTable(request).get().table());
+      Table table = awsGlue.getTable(request).get().table();
+      if (table != null) {
+        initialTableByName.put(tableName, table);
+      }
+      return Objects.nonNull(table);
     } catch (ExecutionException e) {
       if (e.getCause() instanceof EntityNotFoundException) {
         LOG.warn("Table not found: " + tableId(databaseName, tableName), e);
@@ -869,8 +877,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   @Override
   public Option<String> getLastCommitTimeSynced(String tableName) {
     try {
-      Table table = getTable(awsGlue, databaseName, tableName);
-      return 
Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, 
null));
+      return 
Option.ofNullable(getInitialTable(tableName).parameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC,
 null));
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Fail to get last sync commit time for 
" + tableId(databaseName, tableName), e);
     }
@@ -880,8 +887,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   public Option<String> getLastCommitCompletionTimeSynced(String tableName) {
     // Get the last commit completion time from the TBLproperties
     try {
-      Table table = getTable(awsGlue, databaseName, tableName);
-      return 
Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
 null));
+      return 
Option.ofNullable(getInitialTable(tableName).parameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
 null));
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Failed to get the last commit 
completion time synced from the table " + tableName, e);
     }
@@ -1045,8 +1051,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   @Override
   public String getTableLocation(String tableName) {
     try {
-      Table table = getTable(awsGlue, databaseName, tableName);
-      return table.storageDescriptor().location();
+      return getInitialTable(tableName).storageDescriptor().location();
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Fail to get base path for the table " 
+ tableId(databaseName, tableName), e);
     }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index ef0d2f9efc0..6769522289b 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieSyncTableStrategy;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
@@ -184,6 +185,9 @@ public class HiveSyncTool extends HoodieSyncTool implements 
AutoCloseable {
   }
 
   protected void doSync() {
+    // create database if needed
+    checkAndCreateDatabase();
+
     switch (syncClient.getTableType()) {
       case COPY_ON_WRITE:
         syncHoodieTable(snapshotTableName, false, false);
@@ -233,12 +237,7 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
     LOG.info("Trying to sync hoodie table " + tableName + " with base path " + 
syncClient.getBasePath()
         + " of type " + syncClient.getTableType());
 
-    // create database if needed
-    checkAndCreateDatabase();
-
     final boolean tableExists = syncClient.tableExists(tableName);
-    // Get the parquet schema for this table looking at the latest commit
-    MessageType schema = 
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
     // if table exists and location of the metastore table doesn't match the 
hoodie base path, recreate the table
     if (tableExists && 
!FSUtils.comparePathsWithoutScheme(syncClient.getBasePath(), 
syncClient.getTableLocation(tableName))) {
       LOG.info("basepath is updated for the table {}", tableName);
@@ -246,6 +245,14 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
       return;
     }
 
+    // Check if any sync is required
+    if (tableExists && isIncrementalSync() && isAlreadySynced(tableName)) {
+      LOG.info("Table {} is already synced with the latest commit.", 
tableName);
+      return;
+    }
+    // Get the parquet schema for this table looking at the latest commit
+    MessageType schema = 
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+
     boolean schemaChanged;
     boolean propertiesChanged;
     try {
@@ -274,6 +281,18 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
     }
   }
 
+  private boolean isAlreadySynced(String tableName) {
+    return syncClient.getLastCommitTimeSynced(tableName)
+        .map(lastCommit -> {
+          Option<String> lastCompletion =
+              syncClient.getLastCommitCompletionTimeSynced(tableName);
+          String currentLastCommit = 
syncClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(null);
+          return Objects.equals(lastCommit, currentLastCommit) && 
lastCompletion.map(clientLastCompletion ->
+              Objects.equals(clientLastCompletion, 
syncClient.getActiveTimeline().getLatestCompletionTime().orElse(null))).orElse(true);
+        })
+        .orElse(false);
+  }
+
   private void checkAndCreateDatabase() {
     // check if the database exists else create it
     if (config.getBoolean(HIVE_AUTO_CREATE_DATABASE)) {
@@ -294,7 +313,7 @@ public class HiveSyncTool extends HoodieSyncTool implements 
AutoCloseable {
   }
 
   private boolean validateAndSyncPartitions(String tableName, boolean 
tableExists) {
-    boolean syncIncremental = config.getBoolean(META_SYNC_INCREMENTAL);
+    boolean syncIncremental = isIncrementalSync();
     Option<String> lastCommitTimeSynced = (tableExists && syncIncremental)
         ? syncClient.getLastCommitTimeSynced(tableName) : Option.empty();
     Option<String> lastCommitCompletionTimeSynced = (tableExists && 
syncIncremental)
@@ -331,6 +350,10 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
     return partitionsChanged;
   }
 
+  private boolean isIncrementalSync() {
+    return config.getBoolean(META_SYNC_INCREMENTAL);
+  }
+
   protected boolean shouldRecreateAndSyncTable() {
     return config.getBooleanOrDefault(RECREATE_HIVE_TABLE_ON_ERROR);
   }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 09ebc61bf42..315be390819 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -20,7 +20,6 @@ package org.apache.hudi.hive;
 
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -78,6 +77,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieHiveSyncClient.class);
   protected final HiveSyncConfig config;
   private final String databaseName;
+  private final Map<String, Table> initialTableByName = new HashMap<>();
   DDLExecutor ddlExecutor;
   private IMetaStoreClient client;
 
@@ -113,6 +113,16 @@ public class HoodieHiveSyncClient extends HoodieSyncClient 
{
     }
   }
 
+  private Table getInitialTable(String table) {
+    return initialTableByName.computeIfAbsent(table, t -> {
+      try {
+        return client.getTable(databaseName, t);
+      } catch (Exception ex) {
+        throw new HoodieHiveSyncException("Failed to get table " + 
tableId(databaseName, table), ex);
+      }
+    });
+  }
+
   @Override
   public void addPartitionsToTable(String tableName, List<String> 
partitionsToAdd) {
     ddlExecutor.addPartitionsToTable(tableName, partitionsToAdd);
@@ -324,8 +334,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
   public Option<String> getLastCommitTimeSynced(String tableName) {
     // Get the last commit time from the TBLproperties
     try {
-      Table table = client.getTable(databaseName, tableName);
-      return 
Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC,
 null));
+      return 
Option.ofNullable(getInitialTable(tableName).getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC,
 null));
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get the last commit time 
synced from the table " + tableName, e);
     }
@@ -335,8 +344,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
   public Option<String> getLastCommitCompletionTimeSynced(String tableName) {
     // Get the last commit completion time from the TBLproperties
     try {
-      Table table = client.getTable(databaseName, tableName);
-      return 
Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
 null));
+      return 
Option.ofNullable(getInitialTable(tableName).getParameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
 null));
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get the last commit 
completion time synced from the table " + tableName, e);
     }
@@ -439,7 +447,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
   @Override
   public List<FieldSchema> getStorageFieldSchemas() {
     try {
-      return new TableSchemaResolver(metaClient).getTableAvroSchema(false)
+      return tableSchemaResolver.getTableAvroSchema(false)
           .getFields()
           .stream()
           .map(f -> new FieldSchema(f.name(), f.schema().getType().getName(), 
f.doc()))
@@ -489,8 +497,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
   @Override
   public String getTableLocation(String tableName) {
     try {
-      Table table = client.getTable(databaseName, tableName);
-      return table.getSd().getLocation();
+      return getInitialTable(tableName).getSd().getLocation();
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get the basepath of the 
table " + tableId(databaseName, tableName), e);
     }
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index dcc22ac8acb..affa716c33f 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -46,9 +46,8 @@ import org.apache.hudi.hive.ddl.HMSDDLExecutor;
 import org.apache.hudi.hive.ddl.HiveSyncMode;
 import org.apache.hudi.hive.testutils.HiveTestUtil;
 import org.apache.hudi.hive.util.IMetaStoreClientUtil;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.metrics.MetricsReporterType;
-import org.apache.hudi.sync.common.HoodieSyncConfig;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.sync.common.model.FieldSchema;
 import org.apache.hudi.sync.common.model.Partition;
 import org.apache.hudi.sync.common.model.PartitionEvent;
@@ -114,6 +113,7 @@ import static 
org.apache.hudi.hive.testutils.HiveTestUtil.hiveSyncProps;
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_INCREMENTAL;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -275,7 +275,7 @@ public class TestHiveSyncTool {
     assertEquals(3, 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
         "Table partitions should match the number of partitions we wrote");
     // Use META_SYNC_PARTITION_FIXMODE, sync all partition metadata
-    hiveSyncProps.setProperty(HoodieSyncConfig.META_SYNC_INCREMENTAL.key(), 
"false");
+    hiveSyncProps.setProperty(META_SYNC_INCREMENTAL.key(), "false");
     reInitHiveSyncClient();
     reSyncHiveTable();
     assertEquals(4, 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
@@ -994,6 +994,7 @@ public class TestHiveSyncTool {
     assertEquals(0, commentCnt, "hive schema field comment numbers should 
match the avro schema field doc numbers");
 
     hiveSyncProps.setProperty(HIVE_SYNC_COMMENT.key(), "true");
+    hiveSyncProps.setProperty(META_SYNC_INCREMENTAL.key(), "false");
     reInitHiveSyncClient();
     reSyncHiveTable();
     fieldSchemas = 
hiveClient.getMetastoreFieldSchemas(HiveTestUtil.TABLE_NAME);
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index d464e0e5ba3..f453bf85666 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -58,12 +58,14 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
   protected final HoodieSyncConfig config;
   protected final PartitionValueExtractor partitionValueExtractor;
   protected final HoodieTableMetaClient metaClient;
+  protected final ParquetTableSchemaResolver tableSchemaResolver;
   private static final String TEMP_SUFFIX = "_temp";
 
   public HoodieSyncClient(HoodieSyncConfig config, HoodieTableMetaClient 
metaClient) {
     this.config = config;
     this.partitionValueExtractor = 
ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
     this.metaClient = Objects.requireNonNull(metaClient, "metaClient is null");
+    this.tableSchemaResolver = new ParquetTableSchemaResolver(metaClient);
   }
 
   public HoodieTimeline getActiveTimeline() {
@@ -98,7 +100,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
   @Override
   public MessageType getStorageSchema() {
     try {
-      return new 
ParquetTableSchemaResolver(metaClient).getTableParquetSchema();
+      return tableSchemaResolver.getTableParquetSchema();
     } catch (Exception e) {
       throw new HoodieSyncException("Failed to read schema from storage.", e);
     }
@@ -107,7 +109,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
   @Override
   public MessageType getStorageSchema(boolean includeMetadataField) {
     try {
-      return new 
ParquetTableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField);
+      return tableSchemaResolver.getTableParquetSchema(includeMetadataField);
     } catch (Exception e) {
       throw new HoodieSyncException("Failed to read schema from storage.", e);
     }

Reply via email to