[
https://issues.apache.org/jira/browse/HUDI-5077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18010903#comment-18010903
]
Harsha Gudladona commented on HUDI-5077:
----------------------------------------
We are interested in this feature and willing to work on it as well. Thinking
through the current implementation below. Wouldn't it be simpler to find a
commit that matches the topic of the deltastreamer? IOW,
getLatestInstantAndCommitMetadataWithValidCheckpointInfo would become
getLatestInstantAndCommitMetadataWithValidCheckpointInfoForTopic. We can make
it user's responsibility to ensure 2 delta streamers do not run for the same
topic, we can even go further to say if there is an inflight commit (not rolled
back) for the same topic, gracefully exit.
{code:java}
@VisibleForTesting
static Option<Checkpoint>
resolveCheckpointBetweenConfigAndPrevCommit(HoodieTimeline commitsTimeline,
HoodieStreamer.Config streamerConfig,
TypedProperties props) throws IOException {
Option<Checkpoint> resumeCheckpoint = Option.empty();
// has deltacommit and this is a MOR table, then we should get checkpoint
from .deltacommit
// if changing from mor to cow, before changing we must do a full compaction,
so we can only consider .commit in such case
if (streamerConfig.tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
// try get checkpoint from commits(including commit and deltacommit)
// in COW migrating to MOR case, the first batch of the deltastreamer will
lost the checkpoint from COW table, cause the dataloss
HoodieTimeline deltaCommitTimeline = commitsTimeline.filter(instant ->
instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION));
if (!deltaCommitTimeline.empty()) {
commitsTimeline = deltaCommitTimeline;
}
}
Option<HoodieInstant> lastCommit = commitsTimeline.lastInstant();
if (lastCommit.isPresent()) {
// if previous commit metadata did not have the checkpoint key, try
traversing previous commits until we find one.
Option<HoodieCommitMetadata> commitMetadataOption =
getLatestCommitMetadataWithValidCheckpointInfo(commitsTimeline);
int writeTableVersion = ConfigUtils.getIntWithAltKeys(props,
HoodieWriteConfig.WRITE_TABLE_VERSION);
if (commitMetadataOption.isPresent()) {
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
Checkpoint checkpointFromCommit =
CheckpointUtils.getCheckpoint(commitMetadata);
LOG.debug("Checkpoint reset from metadata: " +
checkpointFromCommit.getCheckpointResetKey());
if (ignoreCkpCfgPrevailsOverCkpFromPrevCommit(streamerConfig,
checkpointFromCommit)) {
// we ignore any existing checkpoint and start ingesting afresh
resumeCheckpoint = Option.empty();
} else if (ckpOverrideCfgPrevailsOverCkpFromPrevCommit(streamerConfig,
checkpointFromCommit)) {
resumeCheckpoint = Option.of(buildCheckpointFromConfigOverride(
streamerConfig.sourceClassName, writeTableVersion,
streamerConfig.checkpoint));
} else if (shouldUseCkpFromPrevCommit(checkpointFromCommit)) {
//if previous checkpoint is an empty string, skip resume use
Option.empty()
resumeCheckpoint = Option.of(checkpointFromCommit);
} else if (compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
LESSER_THAN, lastCommit.get().requestedTime())) {
throw new HoodieStreamerException(
"Unable to find previous checkpoint. Please double check if this
table "
+ "was indeed built via delta streamer. Last Commit :" +
lastCommit + ", Instants :"
+ commitsTimeline.getInstants());
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY)))
{
removeConfigFromProps(props, KafkaSourceConfig.KAFKA_CHECKPOINT_TYPE);
}
} else if (streamerConfig.checkpoint != null) {
//
getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will
never return a commit metadata w/o any checkpoint key set.
resumeCheckpoint =
Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName,
writeTableVersion, streamerConfig.checkpoint));
}
}
return resumeCheckpoint;
}
...
public static Option<Pair<String, HoodieCommitMetadata>>
getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline
timeline)
throws IOException {
return (Option<Pair<String, HoodieCommitMetadata>>)
timeline.getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata =
timeline.readCommitMetadata(instant);
if
(!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_KEY))
||
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieStreamer.CHECKPOINT_RESET_KEY))
||
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
||
!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2)))
{
return Option.of(Pair.of(instant.toString(), commitMetadata));
} else {
return Option.empty();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " +
instant.toString(), e);
}
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}{code}
> Supporting multiple deltastreamers writing to a single hudi table
> -----------------------------------------------------------------
>
> Key: HUDI-5077
> URL: https://issues.apache.org/jira/browse/HUDI-5077
> Project: Apache Hudi
> Issue Type: Improvement
> Components: deltastreamer
> Reporter: sivabalan narayanan
> Priority: Major
>
> As of now, we can only have a single deltastreamer write to a single hudi
> table. we have an ask from the community to have 2 deltastreamers write to a
> single table.
>
> Things required to be fixed:
> # we need to fix the checkpointing to have multiple key-value pairs, where
> key represents a unique identifier for the deltastreamer client and value
> represents the checkpoint. We might need to introduce a new notion of
> identifier for each deltastreamer in this case.
> # within delta sync, after writeClient.upsert, before calling
> writeClient.commit, we need to update the checkpoint value. for this, we
> might need to take a lock and then fetch latest checkpoint from timeline
> (since there could be multiple wirters) and then update the checkpoint. and
> release the lock.
>
> These are the changes I can think of. may be while implementing it, there
> could be some more minor fixes required.
>
> ask from a user: https://github.com/apache/hudi/issues/6718
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)