This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 04326111808a fix: allows eager failure from abnormals for streaming 
write (#12150)
04326111808a is described below

commit 04326111808a5bc80f8cb2d5da2f75fa3dcf2091
Author: fhan <[email protected]>
AuthorDate: Sat Jan 31 12:59:56 2026 +0800

    fix: allows eager failure from abnormals for streaming write (#12150)
    
    * apply hoodie.write.ignore.failed when write data failed;
    * the option is default true in Hudi write config for backward 
compatibility;
    * the option is default false for Flink streaming ingestion;
    * fix the commit of coordinator to remove the rollback, now the instant 
would be committed regardless of the write errors.
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
    Co-authored-by: danny0405 <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 19 ++++++
 .../java/org/apache/hudi/io/BaseCreateHandle.java  |  8 ++-
 .../hudi/io/FileGroupReaderBasedMergeHandle.java   |  4 ++
 .../org/apache/hudi/io/HoodieAppendHandle.java     | 10 ++-
 .../org/apache/hudi/io/HoodieWriteMergeHandle.java |  4 ++
 .../io/storage/row/HoodieRowDataCreateHandle.java  |  5 ++
 .../hudi/io/storage/row/HoodieRowCreateHandle.java |  4 ++
 .../apache/hudi/configuration/FlinkOptions.java    |  1 +
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 74 +++++++++-------------
 .../org/apache/hudi/util/FlinkWriteClients.java    |  3 +-
 10 files changed, 82 insertions(+), 50 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index dd5148b33981..beda2612b275 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -920,6 +920,13 @@ public class HoodieWriteConfig extends HoodieConfig {
           + " or when using a custom Hoodie Concat Handle Implementation 
controlled by the config " + CONCAT_HANDLE_CLASS_NAME.key()
               + ", enabling this config results in fallback to the default 
implementations if instantiation of the custom implementation fails");
 
+  public static final ConfigProperty<Boolean> IGNORE_FAILED = ConfigProperty
+      .key("hoodie.write.ignore.failed")
+      .defaultValue(true)
+      .sinceVersion("")
+      .withDocumentation("Flag to indicate whether to ignore any non exception 
error (e.g. write status error)."
+          + "By default true for backward compatibility.");
+
   /**
    * Config key with boolean value that indicates whether record being written 
during MERGE INTO Spark SQL
    * operation are already prepped.
@@ -2924,6 +2931,13 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
   }
 
+  /**
+   * Whether to ignore the write failed.
+   */
+  public boolean getIgnoreWriteFailed() {
+    return getBooleanOrDefault(IGNORE_FAILED);
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3505,6 +3519,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withWriteIgnoreFailed(boolean ignoreFailedWriteData) {
+      writeConfig.setValue(IGNORE_FAILED, 
String.valueOf(ignoreFailedWriteData));
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java
index 38d4438b6f32..8144ae4c2f85 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/BaseCreateHandle.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.storage.StoragePath;
@@ -116,10 +117,11 @@ public abstract class BaseCreateHandle<T, I, K, O> 
extends HoodieWriteHandle<T,
       // record successful.
       record.deflate();
     } catch (Throwable t) {
-      // Not throwing exception from here, since we don't want to fail the 
entire job
-      // for a single record
-      writeStatus.markFailure(record, t, recordMetadata);
       log.error("Error writing record " + record, t);
+      if (!config.getIgnoreWriteFailed()) {
+        throw new HoodieException(t.getMessage(), t);
+      }
+      writeStatus.markFailure(record, t, recordMetadata);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
index 0ca47c44422d..90cf3f6ab2a6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
@@ -47,6 +47,7 @@ import org.apache.hudi.common.table.read.HoodieReadStats;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
@@ -290,6 +291,9 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O> 
extends HoodieWriteMerg
             recordsWritten++;
           } catch (Exception e) {
             log.error("Error writing record {}", record, e);
+            if (!config.getIgnoreWriteFailed()) {
+              throw new HoodieException(e.getMessage(), e);
+            }
             writeStatus.markFailure(record, e, recordMetadata);
             fileGroupReader.onWriteFailure(record.getRecordKey());
           }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 2d0ca0157300..5ea8ba460f87 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -316,6 +316,9 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       hoodieRecord.deflate();
     } catch (Exception e) {
       log.error("Error writing record {}", hoodieRecord, e);
+      if (!config.getIgnoreWriteFailed()) {
+        throw new HoodieException(e.getMessage(), e);
+      }
       writeStatus.markFailure(hoodieRecord, e, recordMetadata);
     }
   }
@@ -526,10 +529,11 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       flushToDiskIfRequired(record, false);
       writeToBuffer(record);
     } catch (Throwable t) {
-      // Not throwing exception from here, since we don't want to fail the 
entire job
-      // for a single record
-      writeStatus.markFailure(record, t, recordMetadata);
       log.error("Error writing record " + record, t);
+      if (!config.getIgnoreWriteFailed()) {
+        throw new HoodieException(t.getMessage(), t);
+      }
+      writeStatus.markFailure(record, t, recordMetadata);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
index 0bdf76bb3578..64d214d20717 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteMergeHandle.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.io.storage.HoodieFileReader;
@@ -343,6 +344,9 @@ public class HoodieWriteMergeHandle<T, I, K, O> extends 
HoodieAbstractMergeHandl
       return true;
     } catch (Exception e) {
       log.error("Error writing record {}", newRecord, e);
+      if (!config.getIgnoreWriteFailed()) {
+        throw new HoodieException(e.getMessage(), e);
+      }
       writeStatus.markFailure(newRecord, e, recordMetadata);
     }
     return false;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index f54254baebfb..eb4954cc6b3b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.storage.HoodieStorage;
@@ -150,6 +151,10 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
             ? HoodieRecordDelegate.create(recordKey, partitionPath, null, 
newRecordLocation) : null;
         writeStatus.markSuccess(recordDelegate, Option.empty());
       } catch (Throwable t) {
+        log.error("Error writing record " + record, t);
+        if (!writeConfig.getIgnoreWriteFailed()) {
+          throw new HoodieException(t.getMessage(), t);
+        }
         writeStatus.markFailure(recordKey, partitionPath, t);
       }
     } catch (Throwable ge) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index f306621a0e95..0222506f56fa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -194,6 +194,10 @@ public class HoodieRowCreateHandle implements Serializable 
{
             ? HoodieRecordDelegate.create(recordKey.toString(), 
partitionPath.toString(), null, newRecordLocation) : null;
         writeStatus.markSuccess(recordDelegate, Option.empty());
       } catch (Exception t) {
+        log.error("Error writing record " + row, t);
+        if (!writeConfig.getIgnoreWriteFailed()) {
+          throw new HoodieException(t.getMessage(), t);
+        }
         writeStatus.markFailure(recordKey.toString(), 
partitionPath.toString(), t);
       }
     } catch (Exception e) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 7ce83e737324..2e7c017225ee 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -559,6 +559,7 @@ public class FlinkOptions extends HoodieConfig {
       .key("write.ignore.failed")
       .booleanType()
       .defaultValue(false)
+      .withFallbackKeys("hoodie.write.ignore.failed")
       .withDescription("Flag to indicate whether to ignore any non exception 
error (e.g. writestatus error). within a checkpoint batch. \n"
           + "By default false. Turning this on, could hide the write status 
errors while the flink checkpoint moves ahead. \n"
           + "So, would recommend users to use this with caution.");
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 107496834d06..d12dccd1195d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -619,53 +619,41 @@ public class StreamWriteOperatorCoordinator
    */
   @SuppressWarnings("unchecked")
   private void doCommit(long checkpointId, String instant, List<WriteStatus> 
dataWriteResults, List<WriteStatus> indexWriteResults) {
-    // commit or rollback
+    // commit and error logging
+    HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+    StreamerUtil.addFlinkCheckpointIdIntoMetaData(conf, 
checkpointCommitMetadata, checkpointId);
+    final Map<String, List<String>> partitionToReplacedFileIds = 
tableState.isOverwrite
+        ? writeClient.getPartitionToReplacedFileIds(tableState.operationType, 
dataWriteResults)
+        : Collections.emptyMap();
+    List<WriteStatus> allWriteStatus = 
Stream.concat(dataWriteResults.stream(), 
indexWriteResults.stream()).collect(Collectors.toList());
+    boolean success = writeClient.commit(instant, allWriteStatus, 
Option.of(checkpointCommitMetadata),
+        tableState.commitAction, partitionToReplacedFileIds);
+    if (success) {
+      this.eventBuffers.reset(checkpointId);
+      log.info("Commit instant [{}] success!", instant);
+    } else {
+      throw new HoodieException(String.format("Commit instant [%s] failed!", 
instant));
+    }
+
     long totalErrorRecords = 
dataWriteResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
     long totalRecords = 
dataWriteResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
     boolean hasErrors = totalErrorRecords > 0;
 
-    if (!hasErrors || this.conf.get(FlinkOptions.IGNORE_FAILED)) {
-      HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
-      StreamerUtil.addFlinkCheckpointIdIntoMetaData(conf, 
checkpointCommitMetadata, checkpointId);
-
-      if (hasErrors) {
-        log.warn("Some records failed to merge but forcing commit since 
commitOnErrors set to true. Errors/Total={}/{}",
-            totalErrorRecords, totalRecords);
-      }
-
-      final Map<String, List<String>> partitionToReplacedFileIds = 
tableState.isOverwrite
-          ? 
writeClient.getPartitionToReplacedFileIds(tableState.operationType, 
dataWriteResults)
-          : Collections.emptyMap();
-      List<WriteStatus> allWriteStatus = 
Stream.concat(dataWriteResults.stream(), 
indexWriteResults.stream()).collect(Collectors.toList());
-      boolean success = writeClient.commit(instant, allWriteStatus, 
Option.of(checkpointCommitMetadata),
-          tableState.commitAction, partitionToReplacedFileIds);
-      if (success) {
-        this.eventBuffers.reset(checkpointId);
-        log.info("Commit instant [{}] success!", instant);
-      } else {
-        throw new HoodieException(String.format("Commit instant [%s] failed!", 
instant));
-      }
-    } else {
-      if (log.isErrorEnabled()) {
-        log.error("Error when writing. Errors/Total={}/{}", totalErrorRecords, 
totalRecords);
-        log.error("The first 10 files with write errors:");
-        
dataWriteResults.stream().filter(WriteStatus::hasErrors).limit(10).forEach(ws 
-> {
-          if (ws.getGlobalError() != null) {
-            log.error("Global error for partition path {} and fileID {}: {}",
-                ws.getPartitionPath(), ws.getFileId(), ws.getGlobalError());
-          }
-          if (!ws.getErrors().isEmpty()) {
-            log.error("The first 100 records-level errors for partition path 
{} and fileID {}:",
-                ws.getPartitionPath(), ws.getFileId());
-            ws.getErrors().entrySet().stream().limit(100).forEach(entry ->
-                log.error("Error for key: {} and Exception: {}", 
entry.getKey(), entry.getValue().getMessage()));
-          }
-        });
-      }
-
-      // Rolls back instant
-      writeClient.rollback(instant);
-      throw new HoodieException(String.format("Commit instant [%s] failed and 
rolled back !", instant));
+    if (hasErrors && log.isErrorEnabled()) {
+      log.error("Error when writing. Errors/Total={}/{}", totalErrorRecords, 
totalRecords);
+      log.error("The first 10 files with write errors:");
+      
dataWriteResults.stream().filter(WriteStatus::hasErrors).limit(10).forEach(ws 
-> {
+        if (ws.getGlobalError() != null) {
+          log.error("Global error for partition path {} and fileID {}: {}",
+              ws.getPartitionPath(), ws.getFileId(), ws.getGlobalError());
+        }
+        if (!ws.getErrors().isEmpty()) {
+          log.error("The first 100 records-level errors for partition path {} 
and fileID {}:",
+              ws.getPartitionPath(), ws.getFileId());
+          ws.getErrors().entrySet().stream().limit(100).forEach(entry ->
+              log.error("Error for key: {} and Exception: {}", entry.getKey(), 
entry.getValue().getMessage()));
+        }
+      });
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 97fe790fa337..499137acaf9b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -242,7 +242,8 @@ public class FlinkWriteClients {
             .withEmbeddedTimelineServerReuseEnabled(true) // make write client 
embedded timeline service singleton
             
.withAllowOperationMetadataField(conf.get(FlinkOptions.CHANGELOG_ENABLED))
             .withProps(flinkConf2TypedProperties(conf))
-            .withSchema(getSourceSchema(conf).toString());
+            .withSchema(getSourceSchema(conf).toString())
+            .withWriteIgnoreFailed(conf.get(FlinkOptions.IGNORE_FAILED));
 
     // <merge_mode, payload_class, merge_strategy_id>
     Triple<RecordMergeMode, String, String> mergingBehavior = 
StreamerUtil.inferMergingBehavior(conf);

Reply via email to