YuweiXiao commented on code in PR #6273:
URL: https://github.com/apache/hudi/pull/6273#discussion_r937664991
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -378,6 +378,27 @@ private void startInstant() {
this.conf.getString(FlinkOptions.TABLE_NAME),
conf.getString(FlinkOptions.TABLE_TYPE));
}
+ /**
+ * Get the valid instant time of last batch from bootstrap events
+ * Return Option.empty() to indicate the instant is invalid from last batch
+ */
+ protected Option<String> bootstrapInstantFromEventBuffer() {
+ ValidationUtils.checkArgument(Arrays.stream(eventBuffer).allMatch(evt ->
evt != null && evt.isBootstrap()));
+ List<WriteMetadataEvent> events = Arrays.stream(eventBuffer).filter(evt ->
!evt.getInstantTime().equals("")).collect(Collectors.toList());
+ String instant =
events.stream().map(WriteMetadataEvent::getInstantTime).reduce((a, b) ->
a.equals(b) ? a : "").orElse("");
+ // instant and parallelism should be unique
+ if (instant.equals("")) {
+ return Option.empty();
+ }
+ int parallelism =
events.stream().map(WriteMetadataEvent::getParallelism).reduce((a, b) ->
a.equals(b) ? a : -1).orElse(-1);
+ if (parallelism == -1) {
+ return Option.empty();
+ }
+
+ int totalNumOfMetadataStates =
events.stream().mapToInt(WriteMetadataEvent::getNumOfMetadataState).sum();
Review Comment:
Hey Danny, it is true that the validation is not necessary if we use the
same `write.task` across runs. But it may not be the case when users change the
write parallelism across runs.
For example, when the user reduces parallelism from 8 -> 4, the coordinator
will receive 4 merged events with the merging optimization you proposed. To
determine if we can do re-commit, we need to validate the events against 8
rather than only checking if all 4 event buffer are not empty. This is also way
I persist parallelism to the writer state now since it may be used in the next
run.
Not sure if there is better solution to this and the fix may not be helpful
in most cases. But for usecases where the source purely relies on Flink ckp to
determine the consumption offset, the mis-handling of re-commit during restart
may cause lost of one batch data in the hudi side (or duplicate data).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]