This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9dd5358c419 [HUDI-9365] Reduce overhead of Hive and AWS Glue sync
tools (#13249)
9dd5358c419 is described below
commit 9dd5358c4193c3480d7b95f6e4961566a0706238
Author: Tim Brown <[email protected]>
AuthorDate: Thu May 29 20:53:02 2025 -0500
[HUDI-9365] Reduce overhead of Hive and AWS Glue sync tools (#13249)
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 29 ++++++++++--------
.../java/org/apache/hudi/hive/HiveSyncTool.java | 35 ++++++++++++++++++----
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 23 +++++++++-----
.../org/apache/hudi/hive/TestHiveSyncTool.java | 7 +++--
.../apache/hudi/sync/common/HoodieSyncClient.java | 6 ++--
5 files changed, 69 insertions(+), 31 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index dccd0b4e0d2..f0579cdc1ed 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -23,7 +23,6 @@ import
org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
@@ -143,11 +142,12 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
private static final String ENABLE_MDT_LISTING =
"hudi.metadata-listing-enabled";
private final String databaseName;
- private final Boolean skipTableArchive;
+ private final boolean skipTableArchive;
private final String enableMetadataTable;
private final int allPartitionsReadParallelism;
private final int changedPartitionsReadParallelism;
private final int changeParallelism;
+ private final Map<String, Table> initialTableByName = new HashMap<>();
public AWSGlueCatalogSyncClient(HiveSyncConfig config, HoodieTableMetaClient
metaClient) {
this(buildAsyncClient(config), config, metaClient);
@@ -201,6 +201,10 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
}
}
+ private Table getInitialTable(String tableName) {
+ return initialTableByName.computeIfAbsent(tableName, t ->
getTable(awsGlue, databaseName, t));
+ }
+
@Override
public List<Partition> getAllPartitions(String tableName) {
ExecutorService executorService =
Executors.newFixedThreadPool(this.allPartitionsReadParallelism, new
CustomizedThreadFactory("glue-sync-all-partitions", true));
@@ -447,7 +451,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
private String getTableDoc() {
try {
- return new
TableSchemaResolver(metaClient).getTableAvroSchema(true).getDoc();
+ return tableSchemaResolver.getTableAvroSchema(true).getDoc();
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get schema's doc from
storage : ", e);
}
@@ -456,7 +460,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
@Override
public List<FieldSchema> getStorageFieldSchemas() {
try {
- return new TableSchemaResolver(metaClient).getTableAvroSchema(true)
+ return tableSchemaResolver.getTableAvroSchema(true)
.getFields()
.stream()
.map(f -> new FieldSchema(f.name(), f.schema().getType().getName(),
f.doc()))
@@ -790,7 +794,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
try {
// GlueMetastoreClient returns partition keys separate from Columns,
hence get both and merge to
// get the Schema of the table.
- Table table = getTable(awsGlue, databaseName, tableName);
+ Table table = getInitialTable(tableName);
Map<String, String> partitionKeysMap =
table.partitionKeys().stream().collect(Collectors.toMap(Column::name, f ->
f.type().toUpperCase()));
@@ -813,7 +817,11 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
.name(tableName)
.build();
try {
- return Objects.nonNull(awsGlue.getTable(request).get().table());
+ Table table = awsGlue.getTable(request).get().table();
+ if (table != null) {
+ initialTableByName.put(tableName, table);
+ }
+ return Objects.nonNull(table);
} catch (ExecutionException e) {
if (e.getCause() instanceof EntityNotFoundException) {
LOG.warn("Table not found: " + tableId(databaseName, tableName), e);
@@ -869,8 +877,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
try {
- Table table = getTable(awsGlue, databaseName, tableName);
- return
Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC,
null));
+ return
Option.ofNullable(getInitialTable(tableName).parameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC,
null));
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to get last sync commit time for
" + tableId(databaseName, tableName), e);
}
@@ -880,8 +887,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
public Option<String> getLastCommitCompletionTimeSynced(String tableName) {
// Get the last commit completion time from the TBLproperties
try {
- Table table = getTable(awsGlue, databaseName, tableName);
- return
Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
null));
+ return
Option.ofNullable(getInitialTable(tableName).parameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
null));
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get the last commit
completion time synced from the table " + tableName, e);
}
@@ -1045,8 +1051,7 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
@Override
public String getTableLocation(String tableName) {
try {
- Table table = getTable(awsGlue, databaseName, tableName);
- return table.storageDescriptor().location();
+ return getInitialTable(tableName).storageDescriptor().location();
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to get base path for the table "
+ tableId(databaseName, tableName), e);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index ef0d2f9efc0..6769522289b 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
@@ -184,6 +185,9 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
}
protected void doSync() {
+ // create database if needed
+ checkAndCreateDatabase();
+
switch (syncClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(snapshotTableName, false, false);
@@ -233,12 +237,7 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " +
syncClient.getBasePath()
+ " of type " + syncClient.getTableType());
- // create database if needed
- checkAndCreateDatabase();
-
final boolean tableExists = syncClient.tableExists(tableName);
- // Get the parquet schema for this table looking at the latest commit
- MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
// if table exists and location of the metastore table doesn't match the
hoodie base path, recreate the table
if (tableExists &&
!FSUtils.comparePathsWithoutScheme(syncClient.getBasePath(),
syncClient.getTableLocation(tableName))) {
LOG.info("basepath is updated for the table {}", tableName);
@@ -246,6 +245,14 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
return;
}
+ // Check if any sync is required
+ if (tableExists && isIncrementalSync() && isAlreadySynced(tableName)) {
+ LOG.info("Table {} is already synced with the latest commit.",
tableName);
+ return;
+ }
+ // Get the parquet schema for this table looking at the latest commit
+ MessageType schema =
syncClient.getStorageSchema(!config.getBoolean(HIVE_SYNC_OMIT_METADATA_FIELDS));
+
boolean schemaChanged;
boolean propertiesChanged;
try {
@@ -274,6 +281,18 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
}
}
+ private boolean isAlreadySynced(String tableName) {
+ return syncClient.getLastCommitTimeSynced(tableName)
+ .map(lastCommit -> {
+ Option<String> lastCompletion =
+ syncClient.getLastCommitCompletionTimeSynced(tableName);
+ String currentLastCommit =
syncClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(null);
+ return Objects.equals(lastCommit, currentLastCommit) &&
lastCompletion.map(clientLastCompletion ->
+ Objects.equals(clientLastCompletion,
syncClient.getActiveTimeline().getLatestCompletionTime().orElse(null))).orElse(true);
+ })
+ .orElse(false);
+ }
+
private void checkAndCreateDatabase() {
// check if the database exists else create it
if (config.getBoolean(HIVE_AUTO_CREATE_DATABASE)) {
@@ -294,7 +313,7 @@ public class HiveSyncTool extends HoodieSyncTool implements
AutoCloseable {
}
private boolean validateAndSyncPartitions(String tableName, boolean
tableExists) {
- boolean syncIncremental = config.getBoolean(META_SYNC_INCREMENTAL);
+ boolean syncIncremental = isIncrementalSync();
Option<String> lastCommitTimeSynced = (tableExists && syncIncremental)
? syncClient.getLastCommitTimeSynced(tableName) : Option.empty();
Option<String> lastCommitCompletionTimeSynced = (tableExists &&
syncIncremental)
@@ -331,6 +350,10 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
return partitionsChanged;
}
+ private boolean isIncrementalSync() {
+ return config.getBoolean(META_SYNC_INCREMENTAL);
+ }
+
protected boolean shouldRecreateAndSyncTable() {
return config.getBooleanOrDefault(RECREATE_HIVE_TABLE_ON_ERROR);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 09ebc61bf42..315be390819 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -20,7 +20,6 @@ package org.apache.hudi.hive;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ConfigUtils;
@@ -78,6 +77,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
private static final Logger LOG =
LoggerFactory.getLogger(HoodieHiveSyncClient.class);
protected final HiveSyncConfig config;
private final String databaseName;
+ private final Map<String, Table> initialTableByName = new HashMap<>();
DDLExecutor ddlExecutor;
private IMetaStoreClient client;
@@ -113,6 +113,16 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
}
}
+ private Table getInitialTable(String table) {
+ return initialTableByName.computeIfAbsent(table, t -> {
+ try {
+ return client.getTable(databaseName, t);
+ } catch (Exception ex) {
+ throw new HoodieHiveSyncException("Failed to get table " +
tableId(databaseName, table), ex);
+ }
+ });
+ }
+
@Override
public void addPartitionsToTable(String tableName, List<String>
partitionsToAdd) {
ddlExecutor.addPartitionsToTable(tableName, partitionsToAdd);
@@ -324,8 +334,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
public Option<String> getLastCommitTimeSynced(String tableName) {
// Get the last commit time from the TBLproperties
try {
- Table table = client.getTable(databaseName, tableName);
- return
Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC,
null));
+ return
Option.ofNullable(getInitialTable(tableName).getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC,
null));
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last commit time
synced from the table " + tableName, e);
}
@@ -335,8 +344,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
public Option<String> getLastCommitCompletionTimeSynced(String tableName) {
// Get the last commit completion time from the TBLproperties
try {
- Table table = client.getTable(databaseName, tableName);
- return
Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
null));
+ return
Option.ofNullable(getInitialTable(tableName).getParameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
null));
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last commit
completion time synced from the table " + tableName, e);
}
@@ -439,7 +447,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
@Override
public List<FieldSchema> getStorageFieldSchemas() {
try {
- return new TableSchemaResolver(metaClient).getTableAvroSchema(false)
+ return tableSchemaResolver.getTableAvroSchema(false)
.getFields()
.stream()
.map(f -> new FieldSchema(f.name(), f.schema().getType().getName(),
f.doc()))
@@ -489,8 +497,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
@Override
public String getTableLocation(String tableName) {
try {
- Table table = client.getTable(databaseName, tableName);
- return table.getSd().getLocation();
+ return getInitialTable(tableName).getSd().getLocation();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the basepath of the
table " + tableId(databaseName, tableName), e);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index dcc22ac8acb..affa716c33f 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -46,9 +46,8 @@ import org.apache.hudi.hive.ddl.HMSDDLExecutor;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.IMetaStoreClientUtil;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.metrics.MetricsReporterType;
-import org.apache.hudi.sync.common.HoodieSyncConfig;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.sync.common.model.FieldSchema;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.hudi.sync.common.model.PartitionEvent;
@@ -114,6 +113,7 @@ import static
org.apache.hudi.hive.testutils.HiveTestUtil.hiveSyncProps;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_INCREMENTAL;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -275,7 +275,7 @@ public class TestHiveSyncTool {
assertEquals(3,
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
"Table partitions should match the number of partitions we wrote");
// Use META_SYNC_PARTITION_FIXMODE, sync all partition metadata
- hiveSyncProps.setProperty(HoodieSyncConfig.META_SYNC_INCREMENTAL.key(),
"false");
+ hiveSyncProps.setProperty(META_SYNC_INCREMENTAL.key(), "false");
reInitHiveSyncClient();
reSyncHiveTable();
assertEquals(4,
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
@@ -994,6 +994,7 @@ public class TestHiveSyncTool {
assertEquals(0, commentCnt, "hive schema field comment numbers should
match the avro schema field doc numbers");
hiveSyncProps.setProperty(HIVE_SYNC_COMMENT.key(), "true");
+ hiveSyncProps.setProperty(META_SYNC_INCREMENTAL.key(), "false");
reInitHiveSyncClient();
reSyncHiveTable();
fieldSchemas =
hiveClient.getMetastoreFieldSchemas(HiveTestUtil.TABLE_NAME);
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index d464e0e5ba3..f453bf85666 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -58,12 +58,14 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
protected final HoodieSyncConfig config;
protected final PartitionValueExtractor partitionValueExtractor;
protected final HoodieTableMetaClient metaClient;
+ protected final ParquetTableSchemaResolver tableSchemaResolver;
private static final String TEMP_SUFFIX = "_temp";
public HoodieSyncClient(HoodieSyncConfig config, HoodieTableMetaClient
metaClient) {
this.config = config;
this.partitionValueExtractor =
ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
this.metaClient = Objects.requireNonNull(metaClient, "metaClient is null");
+ this.tableSchemaResolver = new ParquetTableSchemaResolver(metaClient);
}
public HoodieTimeline getActiveTimeline() {
@@ -98,7 +100,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
@Override
public MessageType getStorageSchema() {
try {
- return new
ParquetTableSchemaResolver(metaClient).getTableParquetSchema();
+ return tableSchemaResolver.getTableParquetSchema();
} catch (Exception e) {
throw new HoodieSyncException("Failed to read schema from storage.", e);
}
@@ -107,7 +109,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
@Override
public MessageType getStorageSchema(boolean includeMetadataField) {
try {
- return new
ParquetTableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField);
+ return tableSchemaResolver.getTableParquetSchema(includeMetadataField);
} catch (Exception e) {
throw new HoodieSyncException("Failed to read schema from storage.", e);
}