[
https://issues.apache.org/jira/browse/HUDI-2773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
sivabalan narayanan updated HUDI-2773:
--------------------------------------
Status: Patch Available (was: In Progress)
> Deltastreamer checkpoint copy over does not ignore compaction metadata
> ----------------------------------------------------------------------
>
> Key: HUDI-2773
> URL: https://issues.apache.org/jira/browse/HUDI-2773
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: sivabalan narayanan
> Assignee: sivabalan narayanan
> Priority: Blocker
> Fix For: 0.10.0
>
>
> compaction commit metadata is not going to have the deltastreamer checkpoint
> key. so, when a concurrent writer is trying to copy over deltastreamer
> checkpoint, it should skip compaction metadata and look at previous instants.
> //possible fix in TransactionUtils
> {code:java}
> public static Option<Pair<HoodieInstant, Map<String, String>>>
> getLastCompletedTxnInstantAndMetadata(
> HoodieTableMetaClient metaClient) {
> List<HoodieInstant> hoodieInstants =
> metaClient.getActiveTimeline().getCommitsTimeline()
>
> .filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
> if (!hoodieInstants.isEmpty()) {
> for (HoodieInstant hoodieInstant : hoodieInstants) {
> try {
> switch (hoodieInstant.getAction()) {
> case HoodieTimeline.REPLACE_COMMIT_ACTION:
> HoodieReplaceCommitMetadata replaceCommitMetadata =
> HoodieReplaceCommitMetadata
>
> .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
> HoodieReplaceCommitMetadata.class);
> return Option.of(Pair.of(hoodieInstant,
> replaceCommitMetadata.getExtraMetadata()));
> case HoodieTimeline.DELTA_COMMIT_ACTION:
> case HoodieTimeline.COMMIT_ACTION:
> HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
>
> .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
> HoodieCommitMetadata.class);
> if
> (!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)) { //
> skip compaction instants
> return Option.of(Pair.of(hoodieInstant,
> commitMetadata.getExtraMetadata()));
> } else {
> LOG.warn("Skipping compaction instants to read latest
> metadata");
> }
> break;
> default:
> throw new IllegalArgumentException("Unknown instant action" +
> hoodieInstant.getAction());
> }
> } catch (IOException io) {
> throw new HoodieIOException("Unable to read metadata for instant " +
> hoodieInstant, io);
> }
> }
> return Option.empty();
> } else {
> return Option.empty();
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)