zhangyue19921010 commented on code in PR #8673:
URL: https://github.com/apache/hudi/pull/8673#discussion_r1442531234
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -423,25 +431,41 @@ private void initInstant(String instant) {
if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() &&
!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
writeClient.getHeartbeatClient().stop(this.instant);
}
- // starts a new instant
- startInstant();
+ this.instant = ckpMetadata.lastPendingInstant();
+ if (writerCurInstants.stream().allMatch(this::isLessThanCurInstant)) {
+ LOG.info("Current instant " + instant + " is new, reuse current
instant.");
+ } else if (isBootstrapOrEmpty(this.instant) ||
writerCurInstants.stream().allMatch(i -> this.instant.equals(i))) {
+ // starts a new instant
+ startInstant();
+ } else {
+ LOG.warn("Current instant: " + this.instant + ", ignore events with
current instants: " + writerCurInstants);
+ }
// upgrade downgrade
this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
}
- private void handleBootstrapEvent(WriteMetadataEvent event) {
- this.eventBuffer[event.getTaskID()] = event;
- if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null &&
evt.isBootstrap())) {
+ private boolean isLessThanCurInstant(String writerCurInstant) {
Review Comment:
I try to understand the logic here: Only when the current ts is valid
(non-bootstrap, non-empty) and [the writer ts is bootstrap or the writer ts is
smaller than the current ts], then the current ts will be reused.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -423,25 +431,41 @@ private void initInstant(String instant) {
if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() &&
!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
writeClient.getHeartbeatClient().stop(this.instant);
}
- // starts a new instant
- startInstant();
+ this.instant = ckpMetadata.lastPendingInstant();
+ if (writerCurInstants.stream().allMatch(this::isLessThanCurInstant)) {
+ LOG.info("Current instant " + instant + " is new, reuse current
instant.");
+ } else if (isBootstrapOrEmpty(this.instant) ||
writerCurInstants.stream().allMatch(i -> this.instant.equals(i))) {
+ // starts a new instant
+ startInstant();
+ } else {
+ LOG.warn("Current instant: " + this.instant + ", ignore events with
current instants: " + writerCurInstants);
+ }
// upgrade downgrade
this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
}
- private void handleBootstrapEvent(WriteMetadataEvent event) {
- this.eventBuffer[event.getTaskID()] = event;
- if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null &&
evt.isBootstrap())) {
+ private boolean isLessThanCurInstant(String writerCurInstant) {
Review Comment:
Question is why do we need to check
`HoodieTimeline.compareTimestamps(writerCurInstant, HoodieTimeline.LESSER_THAN,
this.instant)` here. Can you please help me understand it?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -453,7 +454,7 @@ private boolean flushBucket(DataBucket bucket) {
.endInput(false)
.build();
- this.eventGateway.sendEventToCoordinator(event);
+ this.eventGateway.sendEventToCoordinator(new WriteResultEvent(event,
currentInstant));
Review Comment:
It looks like we build a new WriteResultEvent instant to wrap event and
currentInstant.
But event has already contains the info of currentInstant.
do we need to design this writeResultEvent? Is it possible that
currentInstant and instant in event are different?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -408,6 +413,9 @@ private void startInstant() {
*/
private void initInstant(String instant) {
HoodieTimeline completedTimeline =
this.metaClient.getActiveTimeline().filterCompletedInstants();
+ List<String> writerCurInstants = Arrays.stream(eventBuffer)
Review Comment:
The same question: could we get the instant based on event directly as
writerCurInstants
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]