This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8869b3b [HUDI-1902] Clean the corrupted files generated by
FlinkMergeAndReplaceHandle (#2949)
8869b3b is described below
commit 8869b3b4184bbec4502e2e3f6fde0ea9260cf0b0
Author: Danny Chan <[email protected]>
AuthorDate: Fri May 14 15:43:37 2021 +0800
[HUDI-1902] Clean the corrupted files generated by
FlinkMergeAndReplaceHandle (#2949)
Make the intermediate files of FlinkMergeAndReplaceHandle hidden, when
committing the instant, clean these files in case there was some
corrupted files left(in normal case, the intermediate files should be
cleaned
by the FlinkMergeAndReplaceHandle itself).
---
.../main/java/org/apache/hudi/table/HoodieTable.java | 9 ++++++++-
.../org/apache/hudi/io/FlinkMergeAndReplaceHandle.java | 3 ++-
.../apache/hudi/table/HoodieFlinkCopyOnWriteTable.java | 17 ++++++++++++-----
.../main/java/org/apache/hudi/util/FlinkClientUtil.java | 8 ++++++++
.../apache/hudi/sink/partitioner/BucketAssigner.java | 3 ++-
5 files changed, 32 insertions(+), 8 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index fd5321a..512518c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -480,6 +480,13 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload, I, K, O> implem
}
/**
+ * Returns the possible invalid data file name with given marker files.
+ */
+ protected Set<String> getInvalidDataPaths(MarkerFiles markers) throws
IOException {
+ return markers.createdAndMergedDataPaths(context,
config.getFinalizeWriteParallelism());
+ }
+
+ /**
* Reconciles WriteStats and marker files to detect and safely delete
duplicate data files created because of Spark
* retries.
*
@@ -505,7 +512,7 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload, I, K, O> implem
}
// we are not including log appends here, since they are already
fail-safe.
- Set<String> invalidDataPaths =
markers.createdAndMergedDataPaths(context,
config.getFinalizeWriteParallelism());
+ Set<String> invalidDataPaths = getInvalidDataPaths(markers);
Set<String> validDataPaths = stats.stream()
.map(HoodieWriteStat::getPath)
.filter(p -> p.endsWith(this.getBaseFileExtension()))
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
index 44d630a..c87f3dd 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandle.java
@@ -128,8 +128,9 @@ public class FlinkMergeAndReplaceHandle<T extends
HoodieRecordPayload, I, K, O>
*/
protected String newFileNameWithRollover(int rollNumber) {
// make the intermediate file as hidden
+ final String fileID = "." + this.fileId;
return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber,
- this.fileId, hoodieTable.getBaseFileExtension());
+ fileID, hoodieTable.getBaseFileExtension());
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index cb8303e..7fc3afc 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -31,12 +31,10 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
@@ -56,6 +54,7 @@ import org.apache.hudi.table.action.commit.FlinkMergeHelper;
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import
org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
+import org.apache.hudi.util.FlinkClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +64,10 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
/**
* Implementation of a very heavily read-optimized Hoodie Table where, all
data is stored in base files, with
@@ -321,9 +324,13 @@ public class HoodieFlinkCopyOnWriteTable<T extends
HoodieRecordPayload> extends
}
@Override
- public void finalizeWrite(HoodieEngineContext context, String instantTs,
List<HoodieWriteStat> stats) throws HoodieIOException {
- // do nothing because flink create and merge handles can clean the
- // retry files by themselves.
+ protected Set<String> getInvalidDataPaths(MarkerFiles markers) throws
IOException {
+ // keep only the intermediate file generated by FlinkMergeAndReplaceHandle.
+ return super.getInvalidDataPaths(markers).stream()
+ .filter(path -> {
+ final String fileName = FlinkClientUtil.parseFileName(path);
+ return fileName.startsWith(".") &&
fileName.endsWith(PARQUET.getFileExtension());
+ }).collect(Collectors.toSet());
}
// -------------------------------------------------------------------------
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
index c38c1f1..4112e2b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
@@ -30,6 +30,14 @@ import java.io.File;
public class FlinkClientUtil {
/**
+ * Parses the file name from path.
+ */
+ public static String parseFileName(String path) {
+ int slash = path.lastIndexOf(Path.SEPARATOR);
+ return path.substring(slash + 1);
+ }
+
+ /**
* Returns the hadoop configuration with possible hadoop conf paths.
* E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME.
*/
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
index d89ad83..fab21d9 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
@@ -241,7 +241,8 @@ public class BucketAssigner {
.getLatestBaseFilesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) {
- if (file.getFileSize() < config.getParquetSmallFileLimit()) {
+ // filter out the corrupted files.
+ if (file.getFileSize() < config.getParquetSmallFileLimit() &&
file.getFileSize() > 0) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));