[ 
https://issues.apache.org/jira/browse/HUDI-2773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2773:
--------------------------------------
    Priority: Blocker  (was: Critical)

> Deltastreamer checkpoint copy over does not ignore compaction metadata
> ----------------------------------------------------------------------
>
>                 Key: HUDI-2773
>                 URL: https://issues.apache.org/jira/browse/HUDI-2773
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Blocker
>             Fix For: 0.10.0
>
>
> compaction commit metadata is not going to have the deltastreamer checkpoint 
> key. so, when a concurrent writer is trying to copy over deltastreamer 
> checkpoint, it should skip compaction metadata and look at previous instants. 
> //possible fix in TransactionUtils
> {code:java}
> public static Option<Pair<HoodieInstant, Map<String, String>>> 
> getLastCompletedTxnInstantAndMetadata(
>     HoodieTableMetaClient metaClient) {
>   List<HoodieInstant> hoodieInstants = 
> metaClient.getActiveTimeline().getCommitsTimeline()
>       
> .filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
>   if (!hoodieInstants.isEmpty()) {
>     for (HoodieInstant hoodieInstant : hoodieInstants) {
>       try {
>         switch (hoodieInstant.getAction()) {
>           case HoodieTimeline.REPLACE_COMMIT_ACTION:
>             HoodieReplaceCommitMetadata replaceCommitMetadata = 
> HoodieReplaceCommitMetadata
>                 
> .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
>  HoodieReplaceCommitMetadata.class);
>             return Option.of(Pair.of(hoodieInstant, 
> replaceCommitMetadata.getExtraMetadata()));
>           case HoodieTimeline.DELTA_COMMIT_ACTION:
>           case HoodieTimeline.COMMIT_ACTION:
>             HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
>                 
> .fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
>  HoodieCommitMetadata.class);
>             if 
> (!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)) { // 
> skip compaction instants
>               return Option.of(Pair.of(hoodieInstant, 
> commitMetadata.getExtraMetadata()));
>             } else {
>               LOG.warn("Skipping compaction instants to read latest 
> metadata");
>             }
>             break;
>           default:
>             throw new IllegalArgumentException("Unknown instant action" + 
> hoodieInstant.getAction());
>         }
>       } catch (IOException io) {
>         throw new HoodieIOException("Unable to read metadata for instant " + 
> hoodieInstant, io);
>       }
>     }
>     return Option.empty();
>   } else {
>     return Option.empty();
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to