This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new efb9719018 [HUDI-4332] The current instant may be wrong under some
extreme conditions in AppendWriteFunction. (#5988)
efb9719018 is described below
commit efb971901818479e37714fab3b655ed3dc57477b
Author: BruceLin <[email protected]>
AuthorDate: Tue Jun 28 20:42:26 2022 +0800
[HUDI-4332] The current instant may be wrong under some extreme conditions
in AppendWriteFunction. (#5988)
---
.../org/apache/hudi/sink/append/AppendWriteFunction.java | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 7b40718b35..6e275dc8c8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -106,30 +106,29 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
// Utilities
// -------------------------------------------------------------------------
private void initWriterHelper() {
- this.currentInstant = instantToWrite(true);
- if (this.currentInstant == null) {
+ final String instant = instantToWrite(true);
+ if (instant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");
}
this.writerHelper = new BulkInsertWriterHelper(this.config,
this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
- this.currentInstant, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
+ instant, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
this.rowType);
}
private void flushData(boolean endInput) {
final List<WriteStatus> writeStatus;
- final String instant;
if (this.writerHelper != null) {
writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
- instant = this.writerHelper.getInstantTime();
+ this.currentInstant = this.writerHelper.getInstantTime();
} else {
writeStatus = Collections.emptyList();
- instant = instantToWrite(false);
- LOG.info("No data to write in subtask [{}] for instant [{}]", taskID,
instant);
+ this.currentInstant = instantToWrite(false);
+ LOG.info("No data to write in subtask [{}] for instant [{}]", taskID,
this.currentInstant);
}
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
- .instantTime(instant)
+ .instantTime(this.currentInstant)
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)