sivabalan narayanan created HUDI-2773:
-----------------------------------------

             Summary: Deltastreamer checkpoint copy over does not ignore 
compaction metadata
                 Key: HUDI-2773
                 URL: https://issues.apache.org/jira/browse/HUDI-2773
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: sivabalan narayanan


compaction commit metadata is not going to have the deltastreamer checkpoint 
key. so, when a concurrent writer is trying to copy over deltastreamer 
checkpoint, it should skip compaction metadata and look at previous instants. 

//possible fix in TransactionUtils
{code:java}
public static Option<Pair<HoodieInstant, Map<String, String>>> 
getLastCompletedTxnInstantAndMetadata(
    HoodieTableMetaClient metaClient) {
  List<HoodieInstant> hoodieInstants = 
metaClient.getActiveTimeline().getCommitsTimeline()
      
.filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toList());
  if (!hoodieInstants.isEmpty()) {
    for (HoodieInstant hoodieInstant : hoodieInstants) {
      try {
        switch (hoodieInstant.getAction()) {
          case HoodieTimeline.REPLACE_COMMIT_ACTION:
            HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieReplaceCommitMetadata.class);
            return Option.of(Pair.of(hoodieInstant, 
replaceCommitMetadata.getExtraMetadata()));
          case HoodieTimeline.DELTA_COMMIT_ACTION:
          case HoodieTimeline.COMMIT_ACTION:
            HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieCommitMetadata.class);
            if 
(!commitMetadata.getOperationType().equals(WriteOperationType.UNKNOWN)) { // 
skip compaction instants
              return Option.of(Pair.of(hoodieInstant, 
commitMetadata.getExtraMetadata()));
            } else {
              LOG.warn("Skipping compaction instants to read latest metadata");
            }
            break;
          default:
            throw new IllegalArgumentException("Unknown instant action" + 
hoodieInstant.getAction());
        }
      } catch (IOException io) {
        throw new HoodieIOException("Unable to read metadata for instant " + 
hoodieInstant, io);
      }
    }
    return Option.empty();
  } else {
    return Option.empty();
  }

} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to