This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 05606708fa [HUDI-4393] Add marker file for target file when flink
merge handle rolls over (#6103)
05606708fa is described below
commit 05606708fabdd3b6414cce04802ac85617976d04
Author: Danny Chan <[email protected]>
AuthorDate: Thu Jul 14 16:00:08 2022 +0800
[HUDI-4393] Add marker file for target file when flink merge handle rolls
over (#6103)
---
.../java/org/apache/hudi/table/HoodieTable.java | 2 +-
.../apache/hudi/io/FlinkMergeAndReplaceHandle.java | 2 +-
.../java/org/apache/hudi/io/FlinkMergeHandle.java | 28 +++++-----------------
3 files changed, 8 insertions(+), 24 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 57e816619f..1e68f820d9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -655,7 +655,7 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload, I, K, O> implem
invalidDataPaths.removeAll(validDataPaths);
if (!invalidDataPaths.isEmpty()) {
- LOG.info("Removing duplicate data files created due to spark retries
before committing. Paths=" + invalidDataPaths);
+ LOG.info("Removing duplicate data files created due to task retries
before committing. Paths=" + invalidDataPaths);
Map<String, List<Pair<String, String>>> invalidPathsByPartition =
invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(),
new Path(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
index cf912f620a..9fea0a9718 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -167,7 +167,7 @@ public class FlinkMergeAndReplaceHandle<T extends
HoodieRecordPayload, I, K, O>
try {
fs.rename(newFilePath, oldFilePath);
} catch (IOException e) {
- throw new HoodieIOException("Error while renaming the temporary roll
file: "
+ throw new HoodieIOException("Error while renaming the temporary rollover
file: "
+ newFilePath + " to old base file name: " + oldFilePath, e);
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 1bff89713b..99f111c82f 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -43,7 +43,7 @@ import java.util.List;
/**
* A {@link HoodieMergeHandle} that supports MERGE write incrementally(small
data buffers).
*
- * <p>For a new data buffer, it initialize and set up the next file path to
write,
+ * <p>For a new data buffer, it initializes and set up the next file path to
write,
* and closes the file path when the data buffer write finish. When next data
buffer
* write starts, it rolls over to another new file. If all the data buffers
write finish
* for a checkpoint round, it renames the last new file path as the desired
file name
@@ -143,8 +143,7 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
break;
}
- oldFilePath = newFilePath; // override the old file name
- rolloverPaths.add(oldFilePath);
+ rolloverPaths.add(newFilePath);
newFileName = newFileNameWithRollover(rollNumber++);
newFilePath = makeNewFilePath(partitionPath, newFileName);
LOG.warn("Duplicate write for MERGE bucket with path: " + oldFilePath
+ ", rolls over to new path: " + newFilePath);
@@ -162,13 +161,6 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
this.fileId, hoodieTable.getBaseFileExtension());
}
- @Override
- protected void setWriteStatusPath() {
- // if there was rollover, should set up the path as the initial new file
path.
- Path path = rolloverPaths.size() > 0 ? rolloverPaths.get(0) : newFilePath;
- writeStatus.getStat().setPath(new Path(config.getBasePath()), path);
- }
-
@Override
public List<WriteStatus> close() {
try {
@@ -188,27 +180,19 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
public void finalizeWrite() {
// The file visibility should be kept by the configured ConsistencyGuard
instance.
- rolloverPaths.add(newFilePath);
- if (rolloverPaths.size() == 1) {
+ if (rolloverPaths.size() == 0) {
// only one flush action, no need to roll over
return;
}
- for (int i = 0; i < rolloverPaths.size() - 1; i++) {
- Path path = rolloverPaths.get(i);
+ for (Path path : rolloverPaths) {
try {
fs.delete(path, false);
+ LOG.info("Delete the rollover data file: " + path + " success!");
} catch (IOException e) {
- throw new HoodieIOException("Error when clean the temporary roll file:
" + path, e);
+ throw new HoodieIOException("Error when clean the temporary rollover
data file: " + path, e);
}
}
- final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1);
- final Path desiredPath = rolloverPaths.get(0);
- try {
- fs.rename(lastPath, desiredPath);
- } catch (IOException e) {
- throw new HoodieIOException("Error when rename the temporary roll file:
" + lastPath + " to: " + desiredPath, e);
- }
}
@Override