vamsikarnika commented on code in PR #14260:
URL: https://github.com/apache/hudi/pull/14260#discussion_r2542596752
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataSync.java:
##########
@@ -245,90 +248,166 @@ public void run() throws Exception {
}
private void runMetadataSync(HoodieTableMetaClient sourceTableMetaClient,
HoodieTableMetaClient targetTableMetaClient, Schema schema) throws Exception {
- HoodieWriteConfig writeConfig = getWriteConfig(schema,
targetTableMetaClient, cfg.sourceBasePath);
+ HoodieWriteConfig writeConfig = getWriteConfig(schema,
targetTableMetaClient, cfg.sourceBasePath, cfg.boostrap);
+ HoodieSparkEngineContext hoodieSparkEngineContext = new
HoodieSparkEngineContext(jsc);
+ TransactionManager txnManager = new TransactionManager(writeConfig,
FSUtils.getFs(writeConfig.getBasePath(),
hoodieSparkEngineContext.getHadoopConf().get()));
+
+ HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig,
hoodieSparkEngineContext, targetTableMetaClient);
+ try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
+ if (cfg.boostrap) {
+ runBootstrapSync(sparkTable, sourceTableMetaClient,
targetTableMetaClient, writeClient, schema, txnManager);
+ } else {
+ List<HoodieInstant> instantsToSync =
getInstantsToSync(cfg.sourceBasePath, targetTableMetaClient,
sourceTableMetaClient);
+ for (HoodieInstant instant : instantsToSync) {
+ String commitTime = writeClient.startCommit(instant.getAction(),
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .transitionRequestedToInflight(
+ instant.getAction(),
+ commitTime);
+
+ Option<byte[]> commitMetadataInBytes = Option.empty();
+ List<String> pendingInstants =
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
+ SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
+ targetTableMetaClient, cfg.sourceBasePath,
instant.getTimestamp(), pendingInstants);
+
+ try {
+ txnManager.beginTransaction(Option.of(instant), Option.empty());
+ HoodieTableMetadataWriter hoodieTableMetadataWriter =
+ (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get();
+ // perform table services if required on metadata table
+
hoodieTableMetadataWriter.performTableServices(Option.of(commitTime));
+ switch (instant.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ HoodieCommitMetadata sourceCommitMetadata =
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
+ HoodieCommitMetadata tgtCommitMetadata =
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+ // add metadata sync checkpoint info
+
tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ commitMetadataInBytes =
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+ break;
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+
+ HoodieReplaceCommitMetadata srcReplaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ HoodieReplaceCommitMetadata tgtReplaceCommitMetadata =
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
+
+ // add metadata sync checkpoint info
+
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ commitMetadataInBytes =
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
+ break;
+ case HoodieTimeline.CLEAN_ACTION:
+ HoodieCleanMetadata srcCleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
+ HoodieCleanMetadata tgtCleanMetadata =
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
+ writeConfig, hoodieSparkEngineContext,
targetTableMetaClient);
+ //HoodieCleanMetadata tgtCleanMetadata =
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
+ hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
+
+ commitMetadataInBytes =
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
+ break;
+ }
+
+ } finally {
+ txnManager.endTransaction(Option.of(instant));
+ }
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .saveAsComplete(new HoodieInstant(true, instant.getAction(),
commitTime), commitMetadataInBytes);
+ }
+ if (cfg.performTableMaintenance) {
+ runArchiver(sparkTable, writeClient.getConfig(),
hoodieSparkEngineContext);
+ }
+ }
+ }
+ }
- if (cfg.boostrap) {
- HoodieBootstrapMetadataSync bootstrapMetadataSync = new
HoodieBootstrapMetadataSync(writeConfig, jsc, cfg.sourceBasePath,
cfg.targetBasePath, schema);
- bootstrapMetadataSync.run();
+ private void runBootstrapSync(HoodieSparkTable sparkTable,
HoodieTableMetaClient sourceTableMetaClient,
+ HoodieTableMetaClient targetTableMetaClient,
SparkRDDWriteClient writeClient, Schema schema, TransactionManager txnManager)
throws Exception {
+ Option<HoodieInstant> sourceLastInstant =
sourceTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant();
+ if (!sourceLastInstant.isPresent()) {
return;
}
- List<HoodieInstant> instantsToSync = getInstantsToSync(cfg.sourceBasePath,
targetTableMetaClient, sourceTableMetaClient);
- HoodieSparkEngineContext hoodieSparkEngineContext = new
HoodieSparkEngineContext(jsc);
- for (HoodieInstant instant : instantsToSync) {
- try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(hoodieSparkEngineContext, writeConfig)) {
- String commitTime = writeClient.startCommit(instant.getAction(),
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
- targetTableMetaClient
- .reloadActiveTimeline()
- .transitionRequestedToInflight(
- instant.getAction(),
- commitTime);
-
- Option<byte[]> commitMetadataInBytes = Option.empty();
- List<String> pendingInstants =
getPendingInstants(sourceTableMetaClient.getActiveTimeline(), instant);
- SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableMetaClient.reloadActiveTimeline().getWriteTimeline().filterCompletedInstants().lastInstant(),
- targetTableMetaClient,
sourceTableMetaClient.getBasePathV2().toString(), instant.getTimestamp(),
pendingInstants);
-
-
- HoodieSparkTable sparkTable = HoodieSparkTable.create(writeConfig,
hoodieSparkEngineContext, targetTableMetaClient);
- if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
- HoodieCommitMetadata sourceCommitMetadata =
getHoodieCommitMetadata(instant.getTimestamp(), sourceTableMetaClient);
- HoodieCommitMetadata tgtCommitMetadata =
buildHoodieCommitMetadata(sourceCommitMetadata, commitTime);
-
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
- }
+ String commitTime =
writeClient.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION,
targetTableMetaClient); // single writer. will rollback any pending commits
from previous round.
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .transitionRequestedToInflight(
+ HoodieTimeline.REPLACE_COMMIT_ACTION,
+ commitTime);
- // add metadata sync checkpoint info
- tgtCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
- commitMetadataInBytes =
Option.of(tgtCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
- } else if
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+ HoodieInstant instant = new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime);
+ try {
+ txnManager.beginTransaction(Option.of(instant), Option.empty());
+ SparkHoodieBackedMetadataSyncMetadataWriter metadataWriter =
+ (SparkHoodieBackedMetadataSyncMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get();
+
metadataWriter.bootstrap(sourceLastInstant.map(HoodieInstant::getTimestamp));
+ } finally {
+ txnManager.endTransaction(Option.of(instant));
+ }
+ Option<HoodieInstant> targetTableLastInstant =
targetTableMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ SyncMetadata syncMetadata =
getTableSyncExtraMetadata(targetTableLastInstant, targetTableMetaClient,
+ cfg.sourceBasePath, sourceLastInstant.get().getTimestamp(),
getPendingInstants(sourceTableMetaClient.getActiveTimeline(),
sourceLastInstant.get()));
+ HoodieReplaceCommitMetadata replaceCommitMetadata =
buildComprehensiveReplaceCommitMetadata(sourceTableMetaClient, schema);
+ replaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
+ targetTableMetaClient
+ .reloadActiveTimeline()
+ .saveAsComplete(new HoodieInstant(true,
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime),
+
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
- HoodieReplaceCommitMetadata srcReplaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
-
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
- HoodieReplaceCommitMetadata tgtReplaceCommitMetadata =
buildReplaceCommitMetadata(srcReplaceCommitMetadata, commitTime);
+ private HoodieReplaceCommitMetadata
buildComprehensiveReplaceCommitMetadata(HoodieTableMetaClient
sourceTableMetaClient, Schema schema) {
+ List<HoodieInstant> replaceCommits =
sourceTableMetaClient.getActiveTimeline().filterCompletedInstants().getInstants().stream()
+ .filter(instant ->
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).collect(Collectors.toList());
+
+ HoodieReplaceCommitMetadata replaceCommitMetadata = new
HoodieReplaceCommitMetadata();
+ Map<String, List<String>> totalPartitionToReplacedFiles = new HashMap<>();
+ replaceCommits.forEach(replaceCommit -> {
+ try {
+ HoodieReplaceCommitMetadata commitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(replaceCommit).get(),
HoodieReplaceCommitMetadata.class);
+ Map<String, List<String>> partitionsToReplacedFileGroups =
commitMetadata.getPartitionToReplaceFileIds();
+ for (Map.Entry<String, List<String>> entry :
partitionsToReplacedFileGroups.entrySet()) {
+ String partition = entry.getKey();
+ List<String> replacedFiles = entry.getValue();
+ totalPartitionToReplacedFiles.computeIfAbsent(partition, k -> new
ArrayList<>());
+ totalPartitionToReplacedFiles.get(partition).addAll(replacedFiles);
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to deserialize instant " +
replaceCommit.getTimestamp(), e);
+ }
+ });
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtReplaceCommitMetadata,
hoodieSparkEngineContext.emptyHoodieData(), commitTime);
- }
- // add metadata sync checkpoint info
-
tgtReplaceCommitMetadata.addMetadata(SyncMetadata.TABLE_SYNC_METADATA,
syncMetadata.toJson());
- commitMetadataInBytes =
Option.of(tgtReplaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8));
- } else if (instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)) {
- HoodieCleanMetadata srcCleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
-
sourceTableMetaClient.getCommitsTimeline().getInstantDetails(instant).get());
- HoodieCleanMetadata tgtCleanMetadata =
reconstructHoodieCleanCommitMetadata(srcCleanMetadata,
- writeConfig, hoodieSparkEngineContext, targetTableMetaClient);
- //HoodieCleanMetadata tgtCleanMetadata =
buildHoodieCleanMetadata(srcCleanMetadata, commitTime);
- try (HoodieTableMetadataWriter hoodieTableMetadataWriter =
- (HoodieTableMetadataWriter)
sparkTable.getMetadataWriter(commitTime).get()) {
- hoodieTableMetadataWriter.update(tgtCleanMetadata, commitTime);
- }
+
replaceCommitMetadata.setPartitionToReplaceFileIds(totalPartitionToReplacedFiles);
- commitMetadataInBytes =
TimelineMetadataUtils.serializeCleanMetadata(tgtCleanMetadata);
- }
+ replaceCommitMetadata.addMetadata("schema", schema.toString());
+ replaceCommitMetadata.setOperationType(WriteOperationType.BOOTSTRAP);
+ replaceCommitMetadata.setCompacted(false);
+ return replaceCommitMetadata;
+ }
- targetTableMetaClient
- .reloadActiveTimeline()
- .saveAsComplete(new HoodieInstant(true, instant.getAction(),
commitTime), commitMetadataInBytes);
- }
+ private void runArchiver(
Review Comment:
I think Archival Job handles locks internally? Do we need explicitly set the
lock? Need to check code. For now. I'll move this code into the synchronized
block
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]