This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bada8110120 [HUDI-7494]: multi writer sync partition to glue will
missing some partitions (#10841)
bada8110120 is described below
commit bada811012095232816aead7ba3f542bc3620b7d
Author: Nicolas Paris <[email protected]>
AuthorDate: Sat Jul 20 11:23:37 2024 +0200
[HUDI-7494]: multi writer sync partition to glue will missing some
partitions (#10841)
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 43 +++++++++++++++++-----
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 4 +-
2 files changed, 37 insertions(+), 10 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 60d6a1e708c..520f4364837 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -23,6 +23,8 @@ import
org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.common.util.HoodieTimer;
@@ -841,12 +843,23 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
public Option<String> getLastCommitTimeSynced(String tableName) {
try {
Table table = getTable(awsGlue, databaseName, tableName);
- return
Option.ofNullable(table.parameters().get(HOODIE_LAST_COMMIT_TIME_SYNC));
+ return
Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC,
null));
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to get last sync commit time for
" + tableId(databaseName, tableName), e);
}
}
+ @Override
+ public Option<String> getLastCommitCompletionTimeSynced(String tableName) {
+ // Get the last commit completion time from the TBLproperties
+ try {
+ Table table = getTable(awsGlue, databaseName, tableName);
+ return
Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
null));
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Failed to get the last commit
completion time synced from the table " + tableName, e);
+ }
+ }
+
@Override
public void close() {
awsGlue.close();
@@ -854,15 +867,27 @@ public class AWSGlueCatalogSyncClient extends
HoodieSyncClient {
@Override
public void updateLastCommitTimeSynced(String tableName) {
- if (!getActiveTimeline().lastInstant().isPresent()) {
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<String> lastCommitSynced =
activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
+ Option<String> lastCommitCompletionSynced = activeTimeline
+ .getInstantsOrderedByCompletionTime()
+ .skip(activeTimeline.countInstants() - 1)
+ .findFirst()
+ .map(i -> Option.of(i.getCompletionTime()))
+ .orElse(Option.empty());
+ if (lastCommitSynced.isPresent()) {
+ try {
+ HashMap<String, String> propertyMap = new HashMap<>();
+ propertyMap.put(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get());
+ if (lastCommitCompletionSynced.isPresent()) {
+ propertyMap.put(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
lastCommitCompletionSynced.get());
+ }
+ updateTableParameters(awsGlue, databaseName, tableName, propertyMap,
skipTableArchive);
+ } catch (Exception e) {
+ throw new HoodieGlueSyncException("Fail to update last sync commit
time for " + tableId(databaseName, tableName), e);
+ }
+ } else {
LOG.warn("No commit in active timeline.");
- return;
- }
- final String lastCommitTimestamp =
getActiveTimeline().lastInstant().get().getTimestamp();
- try {
- updateTableParameters(awsGlue, databaseName, tableName,
Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp),
skipTableArchive);
- } catch (Exception e) {
- throw new HoodieGlueSyncException("Fail to update last sync commit time
for " + tableId(databaseName, tableName), e);
}
try {
// as a side effect, we also refresh the partition indexes if needed
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index ebf5dc7368e..ad96e511af6 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -418,7 +418,9 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
SerDeInfo serdeInfo = sd.getSerdeInfo();
serdeInfo.putToParameters(ConfigUtils.TABLE_SERDE_PATH, basePath);
table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC,
lastCommitSynced.get());
- table.putToParameters(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
lastCommitCompletionSynced.get());
+ if (lastCommitCompletionSynced.isPresent()) {
+ table.putToParameters(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
lastCommitCompletionSynced.get());
+ }
client.alter_table(databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get update last commit
time synced to " + lastCommitSynced, e);