[ 
https://issues.apache.org/jira/browse/HUDI-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18010903#comment-18010903
 ] 

Harsha Gudladona commented on HUDI-5077:
----------------------------------------

We are interested in this feature and willing to work on it as well. Thinking 
through the current implementation below. Wouldn't it be simpler to find a 
commit that matches the topic of the deltastreamer? IOW, 
getLatestInstantAndCommitMetadataWithValidCheckpointInfo would become 
getLatestInstantAndCommitMetadataWithValidCheckpointInfoForTopic. We can make 
it user's responsibility to ensure 2 delta streamers do not run for the same 
topic, we can even go further to say if there is an inflight commit (not rolled 
back) for the same topic, gracefully exit. 
{code:java}
@VisibleForTesting
static Option<Checkpoint> 
resolveCheckpointBetweenConfigAndPrevCommit(HoodieTimeline commitsTimeline,
                                                                      
HoodieStreamer.Config streamerConfig,
                                                                      
TypedProperties props) throws IOException {
  Option<Checkpoint> resumeCheckpoint = Option.empty();
  // has deltacommit and this is a MOR table, then we should get checkpoint 
from .deltacommit
  // if changing from mor to cow, before changing we must do a full compaction, 
so we can only consider .commit in such case
  if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
    // try get checkpoint from commits(including commit and deltacommit)
    // in COW migrating to MOR case, the first batch of the deltastreamer will 
lost the checkpoint from COW table, cause the dataloss
    HoodieTimeline deltaCommitTimeline = commitsTimeline.filter(instant -> 
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
    if (!deltaCommitTimeline.empty()) {
      commitsTimeline = deltaCommitTimeline;
    }
  }
  Option<HoodieInstant> lastCommit = commitsTimeline.lastInstant();
  if (lastCommit.isPresent()) {
    // if previous commit metadata did not have the checkpoint key, try 
traversing previous commits until we find one.
    Option<HoodieCommitMetadata> commitMetadataOption = 
getLatestCommitMetadataWithValidCheckpointInfo(commitsTimeline);
    int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, 
HoodieWriteConfig.WRITE_TABLE_VERSION);
    if (commitMetadataOption.isPresent()) {
      HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
      Checkpoint checkpointFromCommit = 
CheckpointUtils.getCheckpoint(commitMetadata);
      LOG.debug("Checkpoint reset from metadata: " + 
checkpointFromCommit.getCheckpointResetKey());
      if (ignoreCkpCfgPrevailsOverCkpFromPrevCommit(streamerConfig, 
checkpointFromCommit)) {
        // we ignore any existing checkpoint and start ingesting afresh
        resumeCheckpoint = Option.empty();
      } else if (ckpOverrideCfgPrevailsOverCkpFromPrevCommit(streamerConfig, 
checkpointFromCommit)) {
        resumeCheckpoint = Option.of(buildCheckpointFromConfigOverride(
            streamerConfig.sourceClassName, writeTableVersion, 
streamerConfig.checkpoint));
      } else if (shouldUseCkpFromPrevCommit(checkpointFromCommit)) {
        //if previous checkpoint is an empty string, skip resume use 
Option.empty()
        resumeCheckpoint = Option.of(checkpointFromCommit);
      } else if (compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
          LESSER_THAN, lastCommit.get().requestedTime())) {
        throw new HoodieStreamerException(
            "Unable to find previous checkpoint. Please double check if this 
table "
                + "was indeed built via delta streamer. Last Commit :" + 
lastCommit + ", Instants :"
                + commitsTimeline.getInstants());
      }
      // KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
      if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY)))
 {
        removeConfigFromProps(props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE);
      }
    } else if (streamerConfig.checkpoint != null) {
      // 
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will 
never return a commit metadata w/o any checkpoint key set.
      resumeCheckpoint = 
Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, 
writeTableVersion, streamerConfig.checkpoint));
    }
  }
  return resumeCheckpoint;
} 

...


public static Option<Pair<String, HoodieCommitMetadata>> 
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline 
timeline)
    throws IOException {
  return (Option<Pair<String, HoodieCommitMetadata>>) 
timeline.getReverseOrderedInstants().map(instant -> {
    try {
      HoodieCommitMetadata commitMetadata = 
timeline.readCommitMetadata(instant);
      if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY))
          || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))
          || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
          || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
 {
        return Option.of(Pair.of(instant.toString(), commitMetadata));
      } else {
        return Option.empty();
      }
    } catch (IOException e) {
      throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + 
instant.toString(), e);
    }
  }).filter(Option::isPresent).findFirst().orElse(Option.empty());
}{code}

> Supporting multiple deltastreamers writing to a single hudi table
> -----------------------------------------------------------------
>
>                 Key: HUDI-5077
>                 URL: https://issues.apache.org/jira/browse/HUDI-5077
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: deltastreamer
>            Reporter: sivabalan narayanan
>            Priority: Major
>
> As of now, we can only have a single deltastreamer write to a single hudi 
> table. we have an ask from the community to have 2 deltastreamers write to a 
> single table. 
>  
> Things required to be fixed:
>  # we need to fix the checkpointing to have multiple key-value pairs, where 
> key represents a unique identifier for the deltastreamer client and value 
> represents the checkpoint. We might need to introduce a new notion of 
> identifier for each deltastreamer in this case.
>  # within delta sync, after writeClient.upsert, before calling 
> writeClient.commit, we need to update the checkpoint value. for this, we 
> might need to take a lock and then fetch latest checkpoint from timeline 
> (since there could be multiple wirters) and then update the checkpoint. and 
> release the lock. 
>  
> These are the changes I can think of. may be while implementing it, there 
> could be some more minor fixes required. 
>  
> ask from a user: https://github.com/apache/hudi/issues/6718
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to