sivabalan narayanan created HUDI-2773:
-----------------------------------------
Summary: Deltastreamer checkpoint copy over does not ignore
compaction metadata
Key: HUDI-2773
URL: https://issues.apache.org/jira/browse/HUDI-2773
Project: Apache Hudi
Issue Type: Bug
Reporter: sivabalan narayanan
compaction commit metadata is not going to have the deltastreamer checkpoint
key. so, when a concurrent writer is trying to copy over deltastreamer
checkpoint, it should skip compaction metadata and look at previous instants.
//possible fix in TransactionUtils
{code:java}
public static Option<Pair<HoodieInstant, Map<String, String>>>
getLastCompletedTxnInstantAndMetadata(
HoodieTableMetaClient metaClient) {
List<HoodieInstant> hoodieInstants =
metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
if (!hoodieInstants.isEmpty()) {
for (HoodieInstant hoodieInstant : hoodieInstants) {
try {
switch (hoodieInstant.getAction()) {
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieReplaceCommitMetadata.class);
return Option.of(Pair.of(hoodieInstant,
replaceCommitMetadata.getExtraMetadata()));
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class);
if
(!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)) { //
skip compaction instants
return Option.of(Pair.of(hoodieInstant,
commitMetadata.getExtraMetadata()));
} else {
LOG.warn("Skipping compaction instants to read latest metadata");
}
break;
default:
throw new IllegalArgumentException("Unknown instant action" +
hoodieInstant.getAction());
}
} catch (IOException io) {
throw new HoodieIOException("Unable to read metadata for instant " +
hoodieInstant, io);
}
}
return Option.empty();
} else {
return Option.empty();
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)