hudi-bot opened a new issue, #15508:
URL: https://github.com/apache/hudi/issues/15508

   As of now, we can only have a single deltastreamer write to a single hudi 
table. we have an ask from the community to have 2 deltastreamers write to a 
single table. 
   
    
   
   Things required to be fixed:
    # we need to fix the checkpointing to have multiple key-value pairs, where 
key represents a unique identifier for the deltastreamer client and value 
represents the checkpoint. We might need to introduce a new notion of 
identifier for each deltastreamer in this case.
    # within delta sync, after writeClient.upsert, before calling 
writeClient.commit, we need to update the checkpoint value. for this, we might 
need to take a lock and then fetch latest checkpoint from timeline (since there 
could be multiple wirters) and then update the checkpoint. and release the 
lock. 
   
    
   
   These are the changes I can think of. may be while implementing it, there 
could be some more minor fixes required. 
   
    
   
   ask from a user: https://github.com/apache/hudi/issues/6718
   
    
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-5077
   - Type: Improvement
   
   
   ---
   
   
   ## Comments
   
   30/Jul/25 12:33;gudladona;We are interested in this feature and willing to 
work on it as well. Thinking through the current implementation below. Wouldn't 
it be simpler to find a commit that matches the topic of the deltastreamer? 
IOW, getLatestInstantAndCommitMetadataWithValidCheckpointInfo would become 
getLatestInstantAndCommitMetadataWithValidCheckpointInfoForTopic. We can make 
it user's responsibility to ensure 2 delta streamers do not run for the same 
topic, we can even go further to say if there is an inflight commit (not rolled 
back) for the same topic, gracefully exit. 
   {code:java}
   @VisibleForTesting
   static Option<Checkpoint> 
resolveCheckpointBetweenConfigAndPrevCommit(HoodieTimeline commitsTimeline,
                                                                         
HoodieStreamer.Config streamerConfig,
                                                                         
TypedProperties props) throws IOException {
     Option<Checkpoint> resumeCheckpoint = Option.empty();
     // has deltacommit and this is a MOR table, then we should get checkpoint 
from .deltacommit
     // if changing from mor to cow, before changing we must do a full 
compaction, so we can only consider .commit in such case
     if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) 
{
       // try get checkpoint from commits(including commit and deltacommit)
       // in COW migrating to MOR case, the first batch of the deltastreamer 
will lost the checkpoint from COW table, cause the dataloss
       HoodieTimeline deltaCommitTimeline = commitsTimeline.filter(instant -> 
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
       if (!deltaCommitTimeline.empty()) {
         commitsTimeline = deltaCommitTimeline;
       }
     }
     Option<HoodieInstant> lastCommit = commitsTimeline.lastInstant();
     if (lastCommit.isPresent()) {
       // if previous commit metadata did not have the checkpoint key, try 
traversing previous commits until we find one.
       Option<HoodieCommitMetadata> commitMetadataOption = 
getLatestCommitMetadataWithValidCheckpointInfo(commitsTimeline);
       int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, 
HoodieWriteConfig.WRITE_TABLE_VERSION);
       if (commitMetadataOption.isPresent()) {
         HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
         Checkpoint checkpointFromCommit = 
CheckpointUtils.getCheckpoint(commitMetadata);
         LOG.debug("Checkpoint reset from metadata: " + 
checkpointFromCommit.getCheckpointResetKey());
         if (ignoreCkpCfgPrevailsOverCkpFromPrevCommit(streamerConfig, 
checkpointFromCommit)) {
           // we ignore any existing checkpoint and start ingesting afresh
           resumeCheckpoint = Option.empty();
         } else if (ckpOverrideCfgPrevailsOverCkpFromPrevCommit(streamerConfig, 
checkpointFromCommit)) {
           resumeCheckpoint = Option.of(buildCheckpointFromConfigOverride(
               streamerConfig.sourceClassName, writeTableVersion, 
streamerConfig.checkpoint));
         } else if (shouldUseCkpFromPrevCommit(checkpointFromCommit)) {
           //if previous checkpoint is an empty string, skip resume use 
Option.empty()
           resumeCheckpoint = Option.of(checkpointFromCommit);
         } else if (compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
             LESSER_THAN, lastCommit.get().requestedTime())) {
           throw new HoodieStreamerException(
               "Unable to find previous checkpoint. Please double check if this 
table "
                   + "was indeed built via delta streamer. Last Commit :" + 
lastCommit + ", Instants :"
                   + commitsTimeline.getInstants());
         }
         // KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
         if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY)))
 {
           removeConfigFromProps(props, 
KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE);
         }
       } else if (streamerConfig.checkpoint != null) {
         // 
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will 
never return a commit metadata w/o any checkpoint key set.
         resumeCheckpoint = 
Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, 
writeTableVersion, streamerConfig.checkpoint));
       }
     }
     return resumeCheckpoint;
   } 
   
   ...
   
   
   public static Option<Pair<String, HoodieCommitMetadata>> 
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline 
timeline)
       throws IOException {
     return (Option<Pair<String, HoodieCommitMetadata>>) 
timeline.getReverseOrderedInstants().map(instant -> {
       try {
         HoodieCommitMetadata commitMetadata = 
timeline.readCommitMetadata(instant);
         if 
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY))
             || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))
             || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
             || 
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
 {
           return Option.of(Pair.of(instant.toString(), commitMetadata));
         } else {
           return Option.empty();
         }
       } catch (IOException e) {
         throw new HoodieIOException("Failed to parse HoodieCommitMetadata for 
" + instant.toString(), e);
       }
     }).filter(Option::isPresent).findFirst().orElse(Option.empty());
   }{code};;;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to