This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6aec9d754f [HUDI-4408] Reuse old rollover file as base file for flink
merge handle (#6120)
6aec9d754f is described below
commit 6aec9d754f4a091136c2dba9643b27ae66db255b
Author: Danny Chan <[email protected]>
AuthorDate: Sat Jul 16 20:46:23 2022 +0800
[HUDI-4408] Reuse old rollover file as base file for flink merge handle
(#6120)
---
.../java/org/apache/hudi/io/FlinkMergeHandle.java | 22 ++++++++++++++++++++--
.../org/apache/hudi/sink/StreamWriteFunction.java | 12 +-----------
2 files changed, 21 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 99f111c82f..69121a9a04 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -143,7 +143,13 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
break;
}
- rolloverPaths.add(newFilePath);
+ // Override the old file name,
+ // In rare cases, when a checkpoint was aborted and the instant time
+ // is reused, the merge handle generates a new file name
+ // with the reused instant time of last checkpoint, which is duplicate,
+ // use the same name file as new base file in case data loss.
+ oldFilePath = newFilePath;
+ rolloverPaths.add(oldFilePath);
newFileName = newFileNameWithRollover(rollNumber++);
newFilePath = makeNewFilePath(partitionPath, newFileName);
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath
+ ", rolls over to new path: " + newFilePath);
@@ -161,6 +167,12 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
this.fileId, hoodieTable.getBaseFileExtension());
}
+ @Override
+ protected void setWriteStatusPath() {
+ // if there was rollover, should set up the path as the initial new file
path.
+ writeStatus.getStat().setPath(new Path(config.getBasePath()),
getWritePath());
+ }
+
@Override
public List<WriteStatus> close() {
try {
@@ -193,6 +205,12 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
throw new HoodieIOException("Error when clean the temporary rollover
data file: " + path, e);
}
}
+ final Path desiredPath = rolloverPaths.get(0);
+ try {
+ fs.rename(newFilePath, desiredPath);
+ } catch (IOException e) {
+ throw new HoodieIOException("Error when rename the temporary roll file:
" + newFilePath + " to: " + desiredPath, e);
+ }
}
@Override
@@ -216,6 +234,6 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
@Override
public Path getWritePath() {
- return newFilePath;
+ return rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index bbaba04144..2748af5290 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -408,16 +408,6 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
&& this.buckets.values().stream().anyMatch(bucket ->
bucket.records.size() > 0);
}
- private void cleanWriteHandles() {
- if (freshInstant(currentInstant)) {
- // In rare cases, when a checkpoint was aborted and the instant time
- // is reused, the merge handle generates a new file name
- // with the reused instant time of last checkpoint, the write handles
- // should be kept and reused in case data loss.
- this.writeClient.cleanHandles();
- }
- }
-
@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);
@@ -489,7 +479,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
- cleanWriteHandles();
+ this.writeClient.cleanHandles();
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;