This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 04326111808a fix: allows eager failure from abnormals for streaming
write (#12150)
04326111808a is described below
commit 04326111808a5bc80f8cb2d5da2f75fa3dcf2091
Author: fhan <[email protected]>
AuthorDate: Sat Jan 31 12:59:56 2026 +0800
fix: allows eager failure from abnormals for streaming write (#12150)
* apply hoodie.write.ignore.failed when write data failed;
* the option is default true in Hudi write config for backward
compatibility;
* the option is default false for Flink streaming ingestion;
* fix the commit of coordinator to remove the rollback, now the instant
would be committed regardless of the write errors.
---------
Co-authored-by: fhan <[email protected]>
Co-authored-by: danny0405 <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 19 ++++++
.../java/org/apache/hudi/io/BaseCreateHandle.java | 8 ++-
.../hudi/io/FileGroupReaderBasedMergeHandle.java | 4 ++
.../org/apache/hudi/io/HoodieAppendHandle.java | 10 ++-
.../org/apache/hudi/io/HoodieWriteMergeHandle.java | 4 ++
.../io/storage/row/HoodieRowDataCreateHandle.java | 5 ++
.../hudi/io/storage/row/HoodieRowCreateHandle.java | 4 ++
.../apache/hudi/configuration/FlinkOptions.java | 1 +
.../hudi/sink/StreamWriteOperatorCoordinator.java | 74 +++++++++-------------
.../org/apache/hudi/util/FlinkWriteClients.java | 3 +-
10 files changed, 82 insertions(+), 50 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index dd5148b33981..beda2612b275 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -920,6 +920,13 @@ public class HoodieWriteConfig extends HoodieConfig {
+ " or when using a custom Hoodie Concat Handle Implementation
controlled by the config " + CONCAT_HANDLE_CLASS_NAME.key()
+ ", enabling this config results in fallback to the default
implementations if instantiation of the custom implementation fails");
+ public static final ConfigProperty<Boolean> IGNORE_FAILED = ConfigProperty
+ .key("hoodie.write.ignore.failed")
+ .defaultValue(true)
+ .sinceVersion("")
+ .withDocumentation("Flag to indicate whether to ignore any non exception
error (e.g. write status error)."
+ + "By default true for backward compatibility.");
+
/**
* Config key with boolean value that indicates whether record being written
during MERGE INTO Spark SQL
* operation are already prepped.
@@ -2924,6 +2931,13 @@ public class HoodieWriteConfig extends HoodieConfig {
}
}
+ /**
+ * Whether to ignore the write failed.
+ */
+ public boolean getIgnoreWriteFailed() {
+ return getBooleanOrDefault(IGNORE_FAILED);
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3505,6 +3519,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withWriteIgnoreFailed(boolean ignoreFailedWriteData) {
+ writeConfig.setValue(IGNORE_FAILED,
String.valueOf(ignoreFailedWriteData));
+ return this;
+ }
+
protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE,
getDefaultMarkersType(engineType));
// Check for mandatory properties
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java
index 38d4438b6f32..8144ae4c2f85 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.storage.StoragePath;
@@ -116,10 +117,11 @@ public abstract class BaseCreateHandle<T, I, K, O>
extends HoodieWriteHandle<T,
// record successful.
record.deflate();
} catch (Throwable t) {
- // Not throwing exception from here, since we don't want to fail the
entire job
- // for a single record
- writeStatus.markFailure(record, t, recordMetadata);
log.error("Error writing record " + record, t);
+ if (!config.getIgnoreWriteFailed()) {
+ throw new HoodieException(t.getMessage(), t);
+ }
+ writeStatus.markFailure(record, t, recordMetadata);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
index 0ca47c44422d..90cf3f6ab2a6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.table.read.HoodieReadStats;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
@@ -290,6 +291,9 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O>
extends HoodieWriteMerg
recordsWritten++;
} catch (Exception e) {
log.error("Error writing record {}", record, e);
+ if (!config.getIgnoreWriteFailed()) {
+ throw new HoodieException(e.getMessage(), e);
+ }
writeStatus.markFailure(record, e, recordMetadata);
fileGroupReader.onWriteFailure(record.getRecordKey());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 2d0ca0157300..5ea8ba460f87 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -316,6 +316,9 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
hoodieRecord.deflate();
} catch (Exception e) {
log.error("Error writing record {}", hoodieRecord, e);
+ if (!config.getIgnoreWriteFailed()) {
+ throw new HoodieException(e.getMessage(), e);
+ }
writeStatus.markFailure(hoodieRecord, e, recordMetadata);
}
}
@@ -526,10 +529,11 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
flushToDiskIfRequired(record, false);
writeToBuffer(record);
} catch (Throwable t) {
- // Not throwing exception from here, since we don't want to fail the
entire job
- // for a single record
- writeStatus.markFailure(record, t, recordMetadata);
log.error("Error writing record " + record, t);
+ if (!config.getIgnoreWriteFailed()) {
+ throw new HoodieException(t.getMessage(), t);
+ }
+ writeStatus.markFailure(record, t, recordMetadata);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
index 0bdf76bb3578..64d214d20717 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.storage.HoodieFileReader;
@@ -343,6 +344,9 @@ public class HoodieWriteMergeHandle<T, I, K, O> extends
HoodieAbstractMergeHandl
return true;
} catch (Exception e) {
log.error("Error writing record {}", newRecord, e);
+ if (!config.getIgnoreWriteFailed()) {
+ throw new HoodieException(e.getMessage(), e);
+ }
writeStatus.markFailure(newRecord, e, recordMetadata);
}
return false;
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index f54254baebfb..eb4954cc6b3b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.storage.HoodieStorage;
@@ -150,6 +151,10 @@ public class HoodieRowDataCreateHandle implements
Serializable {
? HoodieRecordDelegate.create(recordKey, partitionPath, null,
newRecordLocation) : null;
writeStatus.markSuccess(recordDelegate, Option.empty());
} catch (Throwable t) {
+ log.error("Error writing record " + record, t);
+ if (!writeConfig.getIgnoreWriteFailed()) {
+ throw new HoodieException(t.getMessage(), t);
+ }
writeStatus.markFailure(recordKey, partitionPath, t);
}
} catch (Throwable ge) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index f306621a0e95..0222506f56fa 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -194,6 +194,10 @@ public class HoodieRowCreateHandle implements Serializable
{
? HoodieRecordDelegate.create(recordKey.toString(),
partitionPath.toString(), null, newRecordLocation) : null;
writeStatus.markSuccess(recordDelegate, Option.empty());
} catch (Exception t) {
+ log.error("Error writing record " + row, t);
+ if (!writeConfig.getIgnoreWriteFailed()) {
+ throw new HoodieException(t.getMessage(), t);
+ }
writeStatus.markFailure(recordKey.toString(),
partitionPath.toString(), t);
}
} catch (Exception e) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 7ce83e737324..2e7c017225ee 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -559,6 +559,7 @@ public class FlinkOptions extends HoodieConfig {
.key("write.ignore.failed")
.booleanType()
.defaultValue(false)
+ .withFallbackKeys("hoodie.write.ignore.failed")
.withDescription("Flag to indicate whether to ignore any non exception
error (e.g. writestatus error). within a checkpoint batch. \n"
+ "By default false. Turning this on, could hide the write status
errors while the flink checkpoint moves ahead. \n"
+ "So, would recommend users to use this with caution.");
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 107496834d06..d12dccd1195d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -619,53 +619,41 @@ public class StreamWriteOperatorCoordinator
*/
@SuppressWarnings("unchecked")
private void doCommit(long checkpointId, String instant, List<WriteStatus>
dataWriteResults, List<WriteStatus> indexWriteResults) {
- // commit or rollback
+ // commit and error logging
+ HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+ StreamerUtil.addFlinkCheckpointIdIntoMetaData(conf,
checkpointCommitMetadata, checkpointId);
+ final Map<String, List<String>> partitionToReplacedFileIds =
tableState.isOverwrite
+ ? writeClient.getPartitionToReplacedFileIds(tableState.operationType,
dataWriteResults)
+ : Collections.emptyMap();
+ List<WriteStatus> allWriteStatus =
Stream.concat(dataWriteResults.stream(),
indexWriteResults.stream()).collect(Collectors.toList());
+ boolean success = writeClient.commit(instant, allWriteStatus,
Option.of(checkpointCommitMetadata),
+ tableState.commitAction, partitionToReplacedFileIds);
+ if (success) {
+ this.eventBuffers.reset(checkpointId);
+ log.info("Commit instant [{}] success!", instant);
+ } else {
+ throw new HoodieException(String.format("Commit instant [%s] failed!",
instant));
+ }
+
long totalErrorRecords =
dataWriteResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords =
dataWriteResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
boolean hasErrors = totalErrorRecords > 0;
- if (!hasErrors || this.conf.get(FlinkOptions.IGNORE_FAILED)) {
- HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
- StreamerUtil.addFlinkCheckpointIdIntoMetaData(conf,
checkpointCommitMetadata, checkpointId);
-
- if (hasErrors) {
- log.warn("Some records failed to merge but forcing commit since
commitOnErrors set to true. Errors/Total={}/{}",
- totalErrorRecords, totalRecords);
- }
-
- final Map<String, List<String>> partitionToReplacedFileIds =
tableState.isOverwrite
- ?
writeClient.getPartitionToReplacedFileIds(tableState.operationType,
dataWriteResults)
- : Collections.emptyMap();
- List<WriteStatus> allWriteStatus =
Stream.concat(dataWriteResults.stream(),
indexWriteResults.stream()).collect(Collectors.toList());
- boolean success = writeClient.commit(instant, allWriteStatus,
Option.of(checkpointCommitMetadata),
- tableState.commitAction, partitionToReplacedFileIds);
- if (success) {
- this.eventBuffers.reset(checkpointId);
- log.info("Commit instant [{}] success!", instant);
- } else {
- throw new HoodieException(String.format("Commit instant [%s] failed!",
instant));
- }
- } else {
- if (log.isErrorEnabled()) {
- log.error("Error when writing. Errors/Total={}/{}", totalErrorRecords,
totalRecords);
- log.error("The first 10 files with write errors:");
-
dataWriteResults.stream().filter(WriteStatus::hasErrors).limit(10).forEach(ws
-> {
- if (ws.getGlobalError() != null) {
- log.error("Global error for partition path {} and fileID {}: {}",
- ws.getPartitionPath(), ws.getFileId(), ws.getGlobalError());
- }
- if (!ws.getErrors().isEmpty()) {
- log.error("The first 100 records-level errors for partition path
{} and fileID {}:",
- ws.getPartitionPath(), ws.getFileId());
- ws.getErrors().entrySet().stream().limit(100).forEach(entry ->
- log.error("Error for key: {} and Exception: {}",
entry.getKey(), entry.getValue().getMessage()));
- }
- });
- }
-
- // Rolls back instant
- writeClient.rollback(instant);
- throw new HoodieException(String.format("Commit instant [%s] failed and
rolled back !", instant));
+ if (hasErrors && log.isErrorEnabled()) {
+ log.error("Error when writing. Errors/Total={}/{}", totalErrorRecords,
totalRecords);
+ log.error("The first 10 files with write errors:");
+
dataWriteResults.stream().filter(WriteStatus::hasErrors).limit(10).forEach(ws
-> {
+ if (ws.getGlobalError() != null) {
+ log.error("Global error for partition path {} and fileID {}: {}",
+ ws.getPartitionPath(), ws.getFileId(), ws.getGlobalError());
+ }
+ if (!ws.getErrors().isEmpty()) {
+ log.error("The first 100 records-level errors for partition path {}
and fileID {}:",
+ ws.getPartitionPath(), ws.getFileId());
+ ws.getErrors().entrySet().stream().limit(100).forEach(entry ->
+ log.error("Error for key: {} and Exception: {}", entry.getKey(),
entry.getValue().getMessage()));
+ }
+ });
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 97fe790fa337..499137acaf9b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -242,7 +242,8 @@ public class FlinkWriteClients {
.withEmbeddedTimelineServerReuseEnabled(true) // make write client
embedded timeline service singleton
.withAllowOperationMetadataField(conf.get(FlinkOptions.CHANGELOG_ENABLED))
.withProps(flinkConf2TypedProperties(conf))
- .withSchema(getSourceSchema(conf).toString());
+ .withSchema(getSourceSchema(conf).toString())
+ .withWriteIgnoreFailed(conf.get(FlinkOptions.IGNORE_FAILED));
// <merge_mode, payload_class, merge_strategy_id>
Triple<RecordMergeMode, String, String> mergingBehavior =
StreamerUtil.inferMergingBehavior(conf);