This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b98c9ab [HUDI-1895] Close the file handles gracefully for flink write
function to avoid corrupted files (#2938)
b98c9ab is described below
commit b98c9ab4392727b6b9731aea88deaa5337a91dcf
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 12 18:44:10 2021 +0800
[HUDI-1895] Close the file handles gracefully for flink write function to
avoid corrupted files (#2938)
---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 22 +++++++-----
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 41 ++++++++++++++++------
.../java/org/apache/hudi/io/FlinkCreateHandle.java | 16 +++++++++
.../java/org/apache/hudi/io/FlinkMergeHandle.java | 16 +++++++++
.../java/org/apache/hudi/io/MiniBatchHandle.java | 17 ++++++++-
.../delta/BaseFlinkDeltaCommitActionExecutor.java | 2 +-
.../org/apache/hudi/sink/StreamWriteFunction.java | 2 +-
.../hudi/sink/StreamWriteOperatorCoordinator.java | 2 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 14 ++++----
.../apache/hudi/table/HoodieDataSourceITCase.java | 5 ++-
10 files changed, 103 insertions(+), 34 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index a84e116..6cbbeec 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -380,9 +380,19 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
* would close the underneath file handles.
*/
public void cleanHandles() {
- this.bucketToHandles.values().forEach(handle -> {
- ((MiniBatchHandle) handle).finishWrite();
- });
+ this.bucketToHandles.values()
+ .forEach(handle -> ((MiniBatchHandle) handle).finishWrite());
+ this.bucketToHandles.clear();
+ }
+
+ /**
+ * Clean the write handles within a checkpoint interval, this operation
+ * would close the underneath file handles, if any error happens, clean the
+ * corrupted data file.
+ */
+ public void cleanHandlesGracefully() {
+ this.bucketToHandles.values()
+ .forEach(handle -> ((MiniBatchHandle) handle).closeGracefully());
this.bucketToHandles.clear();
}
@@ -438,12 +448,6 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
return table;
}
- public List<String> getInflightsAndRequestedInstants(String commitType) {
- HoodieTimeline unCompletedTimeline =
getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
- return unCompletedTimeline.getInstants().filter(x ->
x.getAction().equals(commitType)).map(HoodieInstant::getTimestamp)
- .collect(Collectors.toList());
- }
-
public String getLastPendingInstant(HoodieTableType tableType) {
final String actionType = CommitUtils.getCommitActionType(tableType);
return getLastPendingInstant(actionType);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index bc6eba8..c859d26 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,12 +47,12 @@ import java.util.List;
* @param <K> Key type
* @param <O> Output type
*/
-public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
+public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
+ extends HoodieAppendHandle<T, I, K, O> implements MiniBatchHandle {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkAppendHandle.class);
- private boolean needBootStrap = true;
- // Total number of bytes written to file
- private long sizeInBytes = 0;
+
+ private boolean shouldRollover = false;
public FlinkAppendHandle(
HoodieWriteConfig config,
@@ -65,6 +66,17 @@ public class FlinkAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends H
}
@Override
+ protected void createMarkerFile(String partitionPath, String dataFileName) {
+ // In some rare cases, the task was pulled up again with same write file
name,
+ // for e.g, reuse the small log files from last commit instant.
+
+ // Just skip the marker file creation if it already exists, the new data
would append to
+ // the file directly.
+ MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
+ markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
+ }
+
+ @Override
protected boolean needsUpdateLocation() {
return false;
}
@@ -80,12 +92,8 @@ public class FlinkAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends H
return true;
}
- /**
- * Returns whether there is need to bootstrap this file handle.
- * E.G. the first time that the handle is created.
- */
- public boolean isNeedBootStrap() {
- return this.needBootStrap;
+ public boolean shouldRollover() {
+ return this.shouldRollover;
}
/**
@@ -98,7 +106,7 @@ public class FlinkAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends H
@Override
public List<WriteStatus> close() {
- needBootStrap = false;
+ shouldRollover = true;
// flush any remaining records to disk
appendDataAndDeleteBlocks(header);
// need to fix that the incremental write size in bytes may be lost
@@ -118,4 +126,15 @@ public class FlinkAppendHandle<T extends
HoodieRecordPayload, I, K, O> extends H
throw new HoodieUpsertException("Failed to close append handle", e);
}
}
+
+ @Override
+ public void closeGracefully() {
+ try {
+ finishWrite();
+ } catch (Throwable throwable) {
+ // The intermediate log file can still append based on the incremental
MERGE semantics,
+ // there is no need to delete the file.
+ LOG.warn("Error while trying to dispose the APPEND handle", throwable);
+ }
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
index fc0f8ef..fe5bf99 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java
@@ -185,6 +185,22 @@ public class FlinkCreateHandle<T extends
HoodieRecordPayload, I, K, O>
}
}
+ @Override
+ public void closeGracefully() {
+ try {
+ finishWrite();
+ } catch (Throwable throwable) {
+ LOG.warn("Error while trying to dispose the CREATE handle", throwable);
+ try {
+ fs.delete(path, false);
+ LOG.info("Deleting the intermediate CREATE data file: " + path + "
success!");
+ } catch (IOException e) {
+ // logging a warning and ignore the exception.
+ LOG.warn("Deleting the intermediate CREATE data file: " + path + "
failed", e);
+ }
+ }
+ }
+
/**
* Performs actions to durably, persist the current changes and returns a
WriteStatus object.
*/
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
index 023ee5f..518ea69 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java
@@ -238,4 +238,20 @@ public class FlinkMergeHandle<T extends
HoodieRecordPayload, I, K, O>
throw new HoodieIOException("Error when rename the temporary roll file:
" + lastPath + " to: " + desiredPath, e);
}
}
+
+ @Override
+ public void closeGracefully() {
+ try {
+ finishWrite();
+ } catch (Throwable throwable) {
+ LOG.warn("Error while trying to dispose the MERGE handle", throwable);
+ try {
+ fs.delete(newFilePath, false);
+ LOG.info("Deleting the intermediate MERGE data file: " + newFilePath +
" success!");
+ } catch (IOException e) {
+ // logging a warning and ignore the exception.
+ LOG.warn("Deleting the intermediate MERGE data file: " + newFilePath +
" failed", e);
+ }
+ }
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
index 2cae807..30ac317 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java
@@ -22,9 +22,24 @@ package org.apache.hudi.io;
* Hoodie write handle that supports write as mini-batch.
*/
public interface MiniBatchHandle {
+
+ /**
+ * Returns whether the handle should roll over to new,
+ * E.G. the handle has written some intermediate data buffer already.
+ */
+ default boolean shouldRollover() {
+ return false;
+ }
+
/**
* Finish the write of multiple mini-batches. Usually these mini-bathes
- * come from a checkpoint interval.
+ * come from one checkpoint interval.
*/
void finishWrite();
+
+ /**
+ * Close the file handle gracefully, if any error happens during the file
handle close,
+ * clean the file to not left corrupted file.
+ */
+ void closeGracefully();
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
index 8d0857e..674a8fd 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java
@@ -49,7 +49,7 @@ public abstract class BaseFlinkDeltaCommitActionExecutor<T
extends HoodieRecordP
@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String
fileId, Iterator<HoodieRecord<T>> recordItr) {
FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
- if (!appendHandle.isNeedBootStrap()) {
+ if (appendHandle.shouldRollover()) {
appendHandle.appendNewRecords(recordItr);
}
appendHandle.doAppend();
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 5a04a35..d9cb389 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -200,7 +200,7 @@ public class StreamWriteFunction<K, I, O>
@Override
public void close() {
if (this.writeClient != null) {
- this.writeClient.cleanHandles();
+ this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 6244d65..ddbb3dd 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -249,7 +249,7 @@ public class StreamWriteOperatorCoordinator
"The coordinator can only handle BatchWriteSuccessEvent");
BatchWriteSuccessEvent event = (BatchWriteSuccessEvent)
operatorEvent;
// the write task does not block after checkpointing(and before it
receives a checkpoint success event),
- // if it it checkpoints succeed then flushes the data buffer again
before this coordinator receives a checkpoint
+ // if it checkpoints succeed then flushes the data buffer again
before this coordinator receives a checkpoint
// success event, the data buffer would flush with an older instant
time.
ValidationUtils.checkState(
HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 02ab280..808e9c7 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -102,7 +102,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
// Utilities
// -------------------------------------------------------------------------
- /** Validate required options. e.g record key and pre combine key.
+ /** Validate required options. For e.g, record key and pre_combine key.
*
* @param conf The table options
* @param schema The table schema
@@ -115,17 +115,17 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","))
.filter(field -> !fields.contains(field))
.findAny()
- .ifPresent(e -> {
- throw new ValidationException("The " + e + " field not exists in
table schema."
- + "Please define primary key or modify
hoodie.datasource.write.recordkey.field option.");
+ .ifPresent(f -> {
+ throw new ValidationException("Field '" + f + "' does not exist in
the table schema."
+ + "Please define primary key or modify
'hoodie.datasource.write.recordkey.field' option.");
});
}
- // validate pre combine key
+ // validate pre_combine key
String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
if (!fields.contains(preCombineField)) {
- throw new ValidationException("The " + preCombineField + " field not
exists in table schema."
- + "Please check write.precombine.field option.");
+ throw new ValidationException("Field " + preCombineField + " does not
exist in the table schema."
+ + "Please check 'write.precombine.field' option.");
}
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 69627f2..41c587c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -392,9 +392,8 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
}
- @ParameterizedTest
- @EnumSource(value = ExecMode.class)
- void testStreamReadEmptyTablePath(ExecMode execMode) throws Exception {
+ @Test
+ void testStreamReadEmptyTablePath() throws Exception {
// create an empty table
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamerUtil.initTableIfNotExists(conf);