This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 328e259d4a7 [HUDI-9417] Add validation for handling write stats during 
commit (#13307)
328e259d4a7 is described below

commit 328e259d4a7bc284acd9417fe8461e34efdf8a78
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jun 6 22:25:37 2025 +0530

    [HUDI-9417] Add validation for handling write stats during commit (#13307)
---
 .../hudi/callback/common/WriteStatusValidator.java |  44 +++++
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   6 +-
 .../java/org/apache/hudi/client/WriteStatus.java   |  12 +-
 .../hudi/client/TestBaseHoodieWriteClient.java     |   3 +-
 .../apache/hudi/config/TestHoodieWriteConfig.java  |   1 +
 .../metadata/TestHoodieMetadataWriteUtils.java     |   3 +-
 .../hudi/utils/HoodieWriterClientTestHarness.java  |   1 -
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   4 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |   7 +-
 .../hudi/client/SparkRDDMetadataWriteClient.java   |  30 ++-
 .../apache/hudi/client/SparkRDDWriteClient.java    | 112 ++++++++++-
 ...ieBackedTableMetadataWriterTableVersionSix.java |   3 +-
 .../client/TestSparkRDDMetadataWriteClient.java    |   2 +-
 .../org/apache/hudi/client/TestWriteStatus.java    |   3 +-
 .../hudi/testutils/HoodieClientTestBase.java       |   2 +-
 .../hudi/common/config/HoodieMetadataConfig.java   |   2 +-
 .../apache/hudi/common/model/HoodieWriteStat.java  |   1 +
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   4 +-
 .../hudi/common/model/TestHoodieWriteStat.java     |   2 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  50 +++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  43 ++---
 .../hudi/utilities/streamer/HoodieStreamer.java    |   7 +-
 .../apache/hudi/utilities/streamer/StreamSync.java | 211 ++++++++++++++-------
 23 files changed, 428 insertions(+), 125 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/WriteStatusValidator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/WriteStatusValidator.java
new file mode 100644
index 00000000000..d1c7a121f28
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/WriteStatusValidator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.common;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.util.Option;
+
+/**
+ * WriteStatus validator to assist caller to process errors if any. Caller can 
dictate if to proceed with the commit or not by means of the return
+ * value of the method {@link WriteStatusValidator#validate}.
+ *
+ * <p>Sometimes callers invoke the dag just to process if there are any errors 
before proceeding with the commit.
+ * This hook function is introduced for avoiding additional dag triggers from 
the callers side.
+ */
+public interface WriteStatusValidator {
+
+  /**
+   * Validates the given write status.
+   *
+   * @param totalRecords        Total records in this inflight commit.
+   * @param totalErroredRecords Total error records in this inflight commit.
+   * @param writeStatusesOpt    List of {@link WriteStatus} for the data table 
writes for this inflight commit.
+   *
+   * @return True if the commit can proceed
+   */
+  boolean validate(long totalRecords, long totalErroredRecords, 
Option<HoodieData<WriteStatus>> writeStatusesOpt);
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 7a298e6ae45..e4751140599 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.callback.HoodieWriteCommitCallback;
 import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
@@ -221,12 +222,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   public boolean commit(String instantTime, O writeStatuses, 
Option<Map<String, String>> extraMetadata,
                         String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds) {
     return commit(instantTime, writeStatuses, extraMetadata, commitActionType, 
partitionToReplacedFileIds,
-        Option.empty());
+        Option.empty(), Option.empty());
   }
 
   public abstract boolean commit(String instantTime, O writeStatuses, 
Option<Map<String, String>> extraMetadata,
                                  String commitActionType, Map<String, 
List<String>> partitionToReplacedFileIds,
-                                 Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc);
+                                 Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                                 Option<WriteStatusValidator> 
writeStatusValidatorOpt);
 
   public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, 
Option<Map<String, String>> extraMetadata,
                              String commitActionType) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index 3477398d7d8..7d355a67418 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -189,12 +189,18 @@ public class WriteStatus implements Serializable {
     totalErrorRecords++;
   }
 
-  public void removeMetadataStats() {
+  public WriteStatus removeMetadataIndexStatsAndErrorRecordsTracking() {
+    removeMetadataStats();
+    dropGranularErrorRecordsTracking();
+    return this;
+  }
+
+  public WriteStatus removeMetadataStats() {
     this.writtenRecordDelegates.clear();
-    this.stat.removeRecordStats();
+    return this;
   }
 
-  public void dropErrorRecords() {
+  public void dropGranularErrorRecordsTracking() {
     failedRecords.clear();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
index 04e3ab8fe26..a198afb8fc6 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
@@ -172,7 +173,7 @@ class TestBaseHoodieWriteClient extends 
HoodieCommonTestHarness {
 
     @Override
     public boolean commit(String instantTime, String writeStatuses, 
Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, 
List<String>> partitionToReplacedFileIds,
-                          Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+                          Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc, Option<WriteStatusValidator> 
writeStatusValidatorOpt) {
       return false;
     }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 7d2563c5f2a..c9cc16f1c04 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -732,6 +732,7 @@ public class TestHoodieWriteConfig {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
         .withPath("/tmp")
         .withProperties(props)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withStreamingWriteEnabled(true).build())
         .withEngineType(EngineType.SPARK).build();
 
     
assertTrue(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.EIGHT));
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
index ee1fd587e04..3b343d3dc12 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
@@ -73,10 +73,11 @@ public class TestHoodieMetadataWriteUtils {
   @Test
   public void testCreateMetadataWriteConfigForNBCC() {
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
-        .withPath("/tmp/base_path/.hoodie/metadata/")
+        .withPath("/tmp/base_path/")
         .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .retainCommits(5).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withStreamingWriteEnabled(true).build())
         .build();
 
     HoodieWriteConfig metadataWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, 
HoodieFailedWritesCleaningPolicy.EAGER,
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index f250f31f333..dabdabdeb5e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -786,7 +786,6 @@ public abstract class HoodieWriterClientTestHarness extends 
HoodieCommonTestHarn
                                                                        
Function2<HoodieTable, HoodieTableMetaClient, HoodieWriteConfig> 
getHoodieTableFn,
                                                                        
Function transformInputFn, Function transformOutputFn) throws Exception {
     String instantTime = "00000000000010";
-    HoodieTableMetaClient metaClient = createMetaClient();
     HoodieWriteConfig cfg = 
getRollbackMarkersAndConsistencyGuardWriteConfig(rollbackUsingMarkers, 
enableOptimisticConsistencyGuard, populateMetaFields);
     BaseHoodieWriteClient client = getHoodieWriteClient(cfg);
     testConsistencyCheck(context, metaClient, instantTime, 
enableOptimisticConsistencyGuard, getHoodieTableFn, transformInputFn, 
transformOutputFn);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 3d9958d0f4d..39e481dff10 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.utils.TransactionUtils;
 import org.apache.hudi.common.data.HoodieListData;
@@ -101,7 +102,8 @@ public class HoodieFlinkWriteClient<T>
   @Override
   public boolean commit(String instantTime, List<WriteStatus> writeStatuses, 
Option<Map<String, String>> extraMetadata,
                         String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
-                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusValidator> writeStatusValidatorOpt) {
     List<HoodieWriteStat> writeStats = 
writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
     // for eager flush, multiple write stat may share one file path.
     List<HoodieWriteStat> merged = writeStats.stream()
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 03362e0f383..c717b854cc8 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -90,10 +91,10 @@ public class HoodieJavaWriteClient<T> extends
                         Option<Map<String, String>> extraMetadata,
                         String commitActionType,
                         Map<String, List<String>> partitionToReplacedFileIds,
-                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusValidator> writeStatusValidatorOpt) {
     List<HoodieWriteStat> writeStats = 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds,
-        extraPreCommitFunc);
+    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
index f3f897d3915..c13624fd867 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
@@ -18,13 +18,17 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -35,8 +39,12 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
 
 /**
  * Write client to assist with writing to metadata table.
@@ -45,6 +53,8 @@ import java.util.List;
  */
 public class SparkRDDMetadataWriteClient<T> extends SparkRDDWriteClient<T> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkRDDMetadataWriteClient.class);
+
   // tracks the instants for which upsertPrepped is invoked.
   private Option<String> firstInstantOpt = Option.empty();
   private int invocationCounts = 0;
@@ -63,10 +73,26 @@ public class SparkRDDMetadataWriteClient<T> extends 
SparkRDDWriteClient<T> {
     return TimelineUtils.generateInstantTime(false, timeGenerator);
   }
 
+  @Override
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusValidator> writeStatusValidatorOpt) {
+    context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
+    // for metadata table, we don't have any write status validator, since we 
use FailOnFirstErrorWriteStatus as the write status class.
+    ValidationUtils.checkArgument(!writeStatusValidatorOpt.isPresent(), 
"Metadata table is not expected to contain write status validator");
+    // Triggering the dag for writes to metadata table.
+    // When streaming writes are enabled, writes to metadata may not call this 
method as the caller tightly controls the dag de-referencing.
+    // Even then, to initialize a new partition in Metadata table and for 
non-incremental operations like insert_overwrite, etc., writes to metadata table
+    // will invoke this commit method.
+    List<HoodieWriteStat> hoodieWriteStats = writeStatuses.map(writeStatus -> 
writeStatus.getStat()).collect();
+    return commitStats(instantTime, hoodieWriteStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+  }
+
   /**
    * Upserts the given prepared records into the Hoodie table, at the supplied 
instantTime.
-   * <p>
-   * This implementation requires that the input records are already tagged, 
and de-duped if needed.
+   *
+   * <p>This implementation requires that the input records are already 
tagged, and de-duped if needed.
    *
    * @param preppedRecords Prepared HoodieRecords to upsert
    * @param instantTime    Instant time of the commit
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 2331596b0eb..23af3fd4441 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.index.HoodieSparkIndexClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
@@ -57,9 +58,12 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 @SuppressWarnings("checkstyle:LineLength")
 public class SparkRDDWriteClient<T> extends
@@ -82,16 +86,58 @@ public class SparkRDDWriteClient<T> extends
     return SparkHoodieIndexFactory.createIndex(config);
   }
 
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
+                        String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+    return commit(instantTime, writeStatuses, extraMetadata, commitActionType, 
partitionToReplacedFileIds, extraPreCommitFunc, Option.empty());
+  }
+
   /**
    * Complete changes performed at the given instantTime marker with specified 
action.
    */
   @Override
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
                         String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds,
-                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+                        Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc,
+                        Option<WriteStatusValidator> writeStatusValidatorOpt) {
     context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " 
+ config.getTableName());
-    List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+    // Triggering the dag for writes.
+    //
+    // 1. If streaming writes are enabled, writes to both data table and 
metadata table gets triggered at this juncture;
+    // 2. If not, writes to data table gets triggered here.
+    //
+    // When streaming writes are enabled, data table's WriteStatus is expected 
to contain all stats required to generate metadata table records and so each 
object will be larger.
+    // Here all additional stats and error records are dropped to retain only 
the required information and prevent collecting large objects on the driver.
+    List<SlimWriteStats> slimWriteStatsList = writeStatuses
+        .map(writeStatus -> new SlimWriteStats(writeStatus.isMetadataTable(), 
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),
+            writeStatus.getStat())).collect();
+    // Compute stats for the data table writes and invoke callback
+    AtomicLong totalRecords = new AtomicLong(0);
+    AtomicLong totalErrorRecords = new AtomicLong(0);
+    // collect record stats for data table
+    slimWriteStatsList.stream().filter(slimWriteStats -> 
!slimWriteStats.isMetadataTable())
+        .forEach(slimWriteStats -> {
+          totalRecords.getAndAdd(slimWriteStats.getTotalRecords());
+          totalErrorRecords.getAndAdd(slimWriteStats.getTotalErrorRecords());
+        });
+    // Why passing RDD<WriteStatus> to the WriteStatus validator:
+    // At the beginning of this method, we drop all index stats and error 
records before collecting in the driver.
+    // Just in case if there are errors, caller might be interested to fetch 
error records in the validator where
+    // a complete collection of RDD<WriteStatus> is required.
+    boolean canProceed = writeStatusValidatorOpt.map(callback -> 
callback.validate(totalRecords.get(), totalErrorRecords.get(),
+            totalErrorRecords.get() > 0 ? 
Option.of(HoodieJavaRDD.of(writeStatuses.filter(status -> 
!status.isMetadataTable()).map(WriteStatus::removeMetadataStats))) : 
Option.empty()))
+        .orElse(true);
+
+    // Proceeds only if validator returns true, otherwise bails out.
+    if (canProceed) {
+      // when streaming writes are enabled, writeStatuses is a mix of data 
table write status and mdt write status
+      List<HoodieWriteStat> dataTableHoodieWriteStats = 
slimWriteStatsList.stream().filter(entry -> 
!entry.isMetadataTable()).map(SlimWriteStats::getWriteStat).collect(Collectors.toList());
+      return commitStats(instantTime, dataTableHoodieWriteStats, 
extraMetadata, commitActionType,
+          partitionToReplacedFileIds, extraPreCommitFunc);
+    } else {
+      LOG.error("Exiting early due to errors with write operation ");
+      return false;
+    }
   }
 
   @Override
@@ -344,4 +390,64 @@ public class SparkRDDWriteClient<T> extends
     super.releaseResources(instantTime);
     SparkReleaseResources.releaseCachedData(context, config, basePath, 
instantTime);
   }
+
+  /**
+   * Slim WriteStatus to hold info like total records, total record records,
+   * HoodieWriteStat and whether the writeStatus is referring to metadata 
table or not.
+   */
+  static class SlimWriteStats implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private boolean isMetadataTable;
+    private long totalRecords;
+    private long totalErrorRecords;
+    private HoodieWriteStat writeStat;
+
+    public SlimWriteStats(boolean isMetadataTable, long totalRecords, long 
totalErrorRecords, HoodieWriteStat writeStat) {
+      this.isMetadataTable = isMetadataTable;
+      this.totalRecords = totalRecords;
+      this.totalErrorRecords = totalErrorRecords;
+      this.writeStat = writeStat;
+    }
+
+    public boolean isMetadataTable() {
+      return isMetadataTable;
+    }
+
+    public long getTotalRecords() {
+      return totalRecords;
+    }
+
+    public long getTotalErrorRecords() {
+      return totalErrorRecords;
+    }
+
+    public HoodieWriteStat getWriteStat() {
+      return writeStat;
+    }
+
+    // setter for efficient serialization,
+    // please do not remove it even if it is not used.
+    public void setMetadataTable(boolean metadataTable) {
+      isMetadataTable = metadataTable;
+    }
+
+    // setter for efficient serialization,
+    // please do not remove it even if it is not used.
+    public void setTotalRecords(long totalRecords) {
+      this.totalRecords = totalRecords;
+    }
+
+    // setter for efficient serialization,
+    // please do not remove it even if it is not used.
+    public void setTotalErrorRecords(long totalErrorRecords) {
+      this.totalErrorRecords = totalErrorRecords;
+    }
+
+    // setter for efficient serialization,
+    // please do not remove it even if it is not used.
+    public void setWriteStat(HoodieWriteStat writeStat) {
+      this.writeStat = writeStat;
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index e49ea638269..e61643de930 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.SparkRDDMetadataWriteClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -159,7 +160,7 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
 
   @Override
   public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, 
JavaRDD<WriteStatus>> initializeWriteClient() {
-    return new SparkRDDWriteClient(engineContext, metadataWriteConfig, 
Option.empty());
+    return new SparkRDDMetadataWriteClient(engineContext, metadataWriteConfig, 
Option.empty());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
index df742c51888..cf913ffff8d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
@@ -94,7 +94,7 @@ public class TestSparkRDDMetadataWriteClient extends 
HoodieClientTestBase {
 
     HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
-            .withRecordIndexFileGroupCount(1, 1).build()).build();
+            .withRecordIndexFileGroupCount(1, 
1).withStreamingWriteEnabled(true).build()).build();
 
     // trigger end to end write to data table so that metadata table is also 
initialized.
     initDataTableWithACommit(hoodieWriteConfig);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 757620172f8..a41aa292be7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -220,7 +220,6 @@ public class TestWriteStatus {
     // Remove metadata stats
     status.removeMetadataStats();
     assertEquals(0, status.getWrittenRecordDelegates().size());
-    assertTrue(status.getStat().getColumnStats().isEmpty());
   }
 
   @Test
@@ -230,7 +229,7 @@ public class TestWriteStatus {
     assertEquals(1, status.getFailedRecords().size());
 
     // Drop error records
-    status.dropErrorRecords();
+    status.dropGranularErrorRecordsTracking();
     assertEquals(0, status.getFailedRecords().size());
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 395d340a9b2..2d8623552e3 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -530,7 +530,7 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
     List<WriteStatus> statusList = writeFn.apply(client, writeRecords, 
newCommitTime).collect();
     JavaRDD<WriteStatus> result = jsc.parallelize(statusList, 1);
     assertNoWriteErrors(statusList);
-    // validate #isMetadataTable() in write status
+    // validate isMetadataTable() in write status
     statusList.forEach(writeStatus -> 
assertFalse(writeStatus.isMetadataTable()));
 
     if (!skipCommit) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 5de0600ccd5..dfc1bb896b6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -981,7 +981,7 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     private boolean getDefaultForStreamingWriteEnabled(EngineType engineType) {
       switch (engineType) {
         case SPARK:
-          return true;
+          return false; // we will flip this to true in future patches.
         case FLINK:
         case JAVA:
           return false;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 1f819a59041..820a255853b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -282,6 +282,7 @@ public class HoodieWriteStat extends HoodieReadStats {
   }
 
   // keep for serialization efficiency
+  // please do not remove it even if it is not used anywhere.
   public void setRecordsStats(Map<String, 
HoodieColumnRangeMetadata<Comparable>> stats) {
     recordsStats = Option.of(stats);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 82f25c76f85..f507a115979 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1609,8 +1609,8 @@ public class HoodieTableMetadataUtil {
   private static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
                                                                       
HoodieTableMetaClient datasetMetaClient,
                                                                       
List<String> columnsToIndex) {
-    if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) 
writeStat).getColumnStats().isPresent()) {
-      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
+    if (writeStat.getColumnStats().isPresent()) {
+      Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
writeStat.getColumnStats().get();
       Collection<HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataList = columnRangeMap.values();
       return 
HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), 
columnRangeMetadataList, false);
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
index bb159ede971..17bbd73ddc3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
@@ -148,7 +148,7 @@ public class TestHoodieWriteStat {
     clonedInput.putAll(columnRangeMetadataMap);
 
     HoodieWriteStat writeStat = new HoodieWriteStat();
-    writeStat.setRecordsStats(clonedInput);
+    writeStat.putRecordsStats(clonedInput);
 
     Map<String, HoodieColumnRangeMetadata<Comparable>> actualRecordStats = 
writeStat.getColumnStats().get();
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index d0d5619687c..5eec6df1130 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -18,10 +18,13 @@
 
 package org.apache.hudi;
 
+import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDReadClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieEmptyRecord;
@@ -35,9 +38,11 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieDuplicateKeyException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
@@ -57,6 +62,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -344,4 +350,48 @@ public class DataSourceUtils {
     return handleDuplicates(
         new HoodieSparkEngineContext(jssc), incomingHoodieRecords, 
writeConfig, failOnDuplicates);
   }
+
+  /**
+   * Spark data source WriteStatus validator.
+   *
+   * <ul>
+   *   <li>If there are error records, prints few of them and exit;</li>
+   *   <li>If not, proceeds with the commit.</li>
+   * </ul>
+   */
+  static class SparkDataSourceWriteStatusValidator implements 
WriteStatusValidator {
+
+    private final WriteOperationType writeOperationType;
+    private final AtomicBoolean hasErrored;
+
+    public SparkDataSourceWriteStatusValidator(WriteOperationType 
writeOperationType, AtomicBoolean hasErrored) {
+      this.writeOperationType = writeOperationType;
+      this.hasErrored = hasErrored;
+    }
+
+    @Override
+    public boolean validate(long totalRecords, long totalErroredRecords, 
Option<HoodieData<WriteStatus>> writeStatusesOpt) {
+      if (totalErroredRecords > 0) {
+        hasErrored.set(true);
+        ValidationUtils.checkArgument(writeStatusesOpt.isPresent(), "RDD 
<WriteStatus> expected to be present when there are errors");
+        LOG.error("{} failed with errors", writeOperationType);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Printing out the top 100 errors");
+
+          
HoodieJavaRDD.getJavaRDD(writeStatusesOpt.get()).filter(WriteStatus::hasErrors)
+              .take(100)
+              .forEach(ws -> {
+                LOG.trace("Global error:", ws.getGlobalError());
+                if (!ws.getErrors().isEmpty()) {
+                  ws.getErrors().forEach((k, v) -> LOG.trace("Error for key 
{}: {}", k, v));
+                }
+              });
+        }
+        return false;
+      } else {
+        return true;
+      }
+    }
+  }
 }
+
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e6588053425..315105816b6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
 import 
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
 import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, 
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
 import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
+import org.apache.hudi.DataSourceUtils.SparkDataSourceWriteStatusValidator
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
 import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
@@ -81,6 +82,7 @@ import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.types.StructType
 import org.slf4j.LoggerFactory
 
+import java.util.concurrent.atomic.AtomicBoolean
 import java.util.function.BiConsumer
 
 import scala.collection.JavaConverters._
@@ -983,19 +985,21 @@ class HoodieSparkSqlWriterInternal {
                                              tableInstantInfo: 
TableInstantInfo,
                                              extraPreCommitFn: 
Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]]
                                             ): (Boolean, 
HOption[java.lang.String], HOption[java.lang.String]) = {
-    if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 
0) {
-      log.info("Proceeding to commit the write.")
-      // get extra metadata from props
-      // 1. properties starting with commit metadata key prefix
-      // 2. properties related to checkpoint in spark streaming
-      val extraMetadataOpt = 
common.util.Option.of(DataSourceUtils.getExtraMetadata(parameters.asJava))
-      val commitSuccess =
-        client.commit(tableInstantInfo.instantTime, 
writeResult.getWriteStatuses,
-          extraMetadataOpt,
-          tableInstantInfo.commitActionType,
-          writeResult.getPartitionToReplaceFileIds,
-          common.util.Option.ofNullable(extraPreCommitFn.orNull))
-
+    val hasErrors = new AtomicBoolean(false)
+    log.info("Proceeding to commit the write.")
+    // get extra metadata from props
+    // 1. properties starting with commit metadata key prefix
+    // 2. properties related to checkpoint in spark streaming
+    val extraMetadataOpt = 
common.util.Option.of(DataSourceUtils.getExtraMetadata(parameters.asJava))
+    val commitSuccess =
+      client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
+        extraMetadataOpt,
+        tableInstantInfo.commitActionType,
+        writeResult.getPartitionToReplaceFileIds,
+        common.util.Option.ofNullable(extraPreCommitFn.orNull),
+        org.apache.hudi.common.util.Option.of(new 
SparkDataSourceWriteStatusValidator(tableInstantInfo.operation, hasErrors)))
+
+    if (!hasErrors.get()) {
       if (commitSuccess) {
         log.info("Commit " + tableInstantInfo.instantTime + " successful!")
       }
@@ -1029,19 +1033,6 @@ class HoodieSparkSqlWriterInternal {
       log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
       (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
     } else {
-      log.error(s"${tableInstantInfo.operation} failed with errors")
-      if (log.isTraceEnabled) {
-        log.trace("Printing out the top 100 errors")
-        writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
-          .take(100)
-          .foreach(ws => {
-            log.trace("Global error :", ws.getGlobalError)
-            if (ws.getErrors.size() > 0) {
-              ws.getErrors.asScala.foreach(kt =>
-                log.trace(s"Error for key: ${kt._1}", kt._2))
-            }
-          })
-      }
       (false, common.util.Option.empty(), common.util.Option.empty())
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index c5fa5d7ffc8..05c93ba0e8b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -925,8 +925,11 @@ public class HoodieStreamer implements Serializable {
 
     @Override
     protected boolean requestShutdownIfNeeded(Option<HoodieData<WriteStatus>> 
lastWriteStatuses) {
-      Option<JavaRDD<WriteStatus>> lastWriteStatusRDD = 
Option.ofNullable(lastWriteStatuses.isPresent() ? 
HoodieJavaRDD.getJavaRDD(lastWriteStatuses.get()) : null);
-      return postWriteTerminationStrategy.isPresent() && 
postWriteTerminationStrategy.get().shouldShutdown(lastWriteStatusRDD);
+      if (postWriteTerminationStrategy.isPresent()) {
+        Option<JavaRDD<WriteStatus>> lastWriteStatusRDD = 
lastWriteStatuses.map(HoodieJavaRDD::getJavaRDD);
+        return 
postWriteTerminationStrategy.get().shouldShutdown(lastWriteStatusRDD);
+      }
+      return false;
     }
 
     /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index c36510a0db7..9766076a94a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -27,6 +27,7 @@ import org.apache.hudi.HoodieSchemaUtils;
 import org.apache.hudi.HoodieSparkSqlWriter;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -65,6 +67,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetaSyncException;
@@ -128,6 +131,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -799,93 +803,48 @@ public class StreamSync implements Serializable, 
Closeable {
       // write to hudi and fetch result
       WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, 
instantTime, useRowWriter);
       Map<String, List<String>> partitionToReplacedFileIds = 
writeClientWriteResult.getPartitionToReplacedFileIds();
-      // write to error table
-      JavaRDD<WriteStatus> dataTableWriteStatusRDD = 
writeClientWriteResult.getWriteStatusRDD();
-      JavaRDD<WriteStatus> writeStatusRDD = dataTableWriteStatusRDD;
+      JavaRDD<WriteStatus> writeStatusRDD = 
writeClientWriteResult.getWriteStatusRDD();
       String errorTableInstantTime = writeClient.createNewInstantTime();
       Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt = 
Option.empty();
       if (errorTableWriter.isPresent() && isErrorTableWriteUnificationEnabled) 
{
         errorTableWriteStatusRDDOpt = errorTableWriter.map(w -> 
w.upsert(errorTableInstantTime, instantTime, getLatestCommittedInstant()));
-        writeStatusRDD = errorTableWriteStatusRDDOpt.map(errorTableWriteStatus 
-> 
errorTableWriteStatus.union(dataTableWriteStatusRDD)).orElse(dataTableWriteStatusRDD);
       }
-      // process write status
-      long totalErrorRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
-      long totalRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
-      long totalSuccessfulRecords = totalRecords - totalErrorRecords;
-      LOG.info("instantTime={}, totalRecords={}, totalErrorRecords={}, 
totalSuccessfulRecords={}",
-          instantTime, totalRecords, totalErrorRecords, 
totalSuccessfulRecords);
-      if (totalRecords == 0) {
-        LOG.info("No new data, perform empty commit.");
-      }
-      boolean hasErrors = totalErrorRecords > 0;
-      if (!hasErrors || cfg.commitOnErrors) {
-        Map<String, String> checkpointCommitMetadata = 
extractCheckpointMetadata(inputBatch, props, 
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
 
-        if (hasErrors) {
-          LOG.warn("Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total="
-              + totalErrorRecords + "/" + totalRecords);
-        }
-        String commitActionType = 
CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
-        if (errorTableWriter.isPresent()) {
-          boolean errorTableSuccess = true;
-          // Commit the error events triggered so far to the error table
-          if (isErrorTableWriteUnificationEnabled && 
errorTableWriteStatusRDDOpt.isPresent()) {
-            errorTableSuccess = 
errorTableWriter.get().commit(errorTableInstantTime, 
errorTableWriteStatusRDDOpt.get());
-          } else if (!isErrorTableWriteUnificationEnabled) {
-            errorTableSuccess = 
errorTableWriter.get().upsertAndCommit(instantTime, 
getLatestCommittedInstant());
-          }
-          if (!errorTableSuccess) {
-            switch (errorWriteFailureStrategy) {
-              case ROLLBACK_COMMIT:
-                LOG.info("Commit " + instantTime + " failed!");
-                writeClient.rollback(instantTime);
-                throw new HoodieStreamerWriteException("Error table commit 
failed");
-              case LOG_ERROR:
-                LOG.error("Error Table write failed for instant " + 
instantTime);
-                break;
-              default:
-                throw new HoodieStreamerWriteException("Write failure strategy 
not implemented for " + errorWriteFailureStrategy);
-            }
-          }
+      Map<String, String> checkpointCommitMetadata = 
extractCheckpointMetadata(inputBatch, props, 
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
+      AtomicLong totalSuccessfulRecords = new AtomicLong(0);
+      Option<String> latestCommittedInstant = getLatestCommittedInstant();
+      WriteStatusValidator writeStatusValidator = new 
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
+          cfg, errorTableWriter, errorTableWriteStatusRDDOpt, 
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled, 
errorTableInstantTime, writeClient, latestCommittedInstant,
+          totalSuccessfulRecords);
+      String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
+
+      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty(),
+          Option.of(writeStatusValidator));
+      releaseResourcesInvoked = true;
+      if (success) {
+        LOG.info("Commit " + instantTime + " successful!");
+        
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch() 
!= null
+            ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() : 
null);
+        // Schedule compaction if needed
+        if (cfg.isAsyncCompactionEnabled()) {
+          scheduledCompactionInstant = 
writeClient.scheduleCompaction(Option.empty());
         }
-        boolean success = writeClient.commit(instantTime, 
dataTableWriteStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty());
-        releaseResourcesInvoked = true;
-        if (success) {
-          LOG.info("Commit " + instantTime + " successful!");
-          
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch() 
!= null
-              ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() : 
null);
-          // Schedule compaction if needed
-          if (cfg.isAsyncCompactionEnabled()) {
-            scheduledCompactionInstant = 
writeClient.scheduleCompaction(Option.empty());
-          }
 
-          if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) {
-            runMetaSync();
-          } else {
-            LOG.info(String.format("Not running metaSync 
totalSuccessfulRecords=%d", totalSuccessfulRecords));
-          }
+        if ((totalSuccessfulRecords.get() > 0) || cfg.forceEmptyMetaSync) {
+          runMetaSync();
         } else {
-          LOG.info("Commit " + instantTime + " failed!");
-          throw new HoodieStreamerWriteException("Commit " + instantTime + " 
failed!");
+          LOG.info(String.format("Not running metaSync 
totalSuccessfulRecords=%d", totalSuccessfulRecords.get()));
         }
       } else {
-        LOG.error("Delta Sync found errors when writing. Errors/Total=" + 
totalErrorRecords + "/" + totalRecords);
-        LOG.error("Printing out the top 100 errors");
-        
dataTableWriteStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
-          LOG.error("Global error :", ws.getGlobalError());
-          if (ws.getErrors().size() > 0) {
-            ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" 
+ key + " is " + value));
-          }
-        });
-        // Rolling back instant
-        writeClient.rollback(instantTime);
-        throw new HoodieStreamerWriteException("Commit " + instantTime + " 
failed and rolled-back !");
+        LOG.info("Commit " + instantTime + " failed!");
+        throw new HoodieStreamerWriteException("Commit " + instantTime + " 
failed!");
       }
+
       long overallTimeNanos = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
 
       // Send DeltaStreamer Metrics
       metrics.updateStreamerMetrics(overallTimeNanos);
-      return Pair.of(scheduledCompactionInstant, dataTableWriteStatusRDD);
+      return Pair.of(scheduledCompactionInstant, writeStatusRDD);
     } finally {
       if (!releaseResourcesInvoked) {
         releaseResources(instantTime);
@@ -1339,4 +1298,114 @@ public class StreamSync implements Serializable, 
Closeable {
       return writeStatusRDD;
     }
   }
+
+  /**
+   * WriteStatus Validator for commits to hoodie streamer data table.
+   * The writes to error table is taken care as well.
+   */
+  static class HoodieStreamerWriteStatusValidator implements 
WriteStatusValidator {
+
+    private final boolean commitOnErrors;
+    private final String instantTime;
+    private final HoodieStreamer.Config cfg;
+    private final Option<BaseErrorTableWriter> errorTableWriter;
+    private final Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt;
+    private final HoodieErrorTableConfig.ErrorWriteFailureStrategy 
errorWriteFailureStrategy;
+    private final boolean isErrorTableWriteUnificationEnabled;
+    private final String errorTableInstantTime;
+    private final SparkRDDWriteClient writeClient;
+    private final Option<String> latestCommittedInstant;
+    private final AtomicLong totalSuccessfulRecords;
+
+    HoodieStreamerWriteStatusValidator(boolean commitOnErrors,
+                                       String instantTime,
+                                       HoodieStreamer.Config cfg,
+                                       Option<BaseErrorTableWriter> 
errorTableWriter,
+                                       Option<JavaRDD<WriteStatus>> 
errorTableWriteStatusRDDOpt,
+                                       
HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy,
+                                       boolean 
isErrorTableWriteUnificationEnabled,
+                                       String errorTableInstantTime,
+                                       SparkRDDWriteClient writeClient,
+                                       Option<String> latestCommittedInstant,
+                                       AtomicLong totalSuccessfulRecords) {
+      this.commitOnErrors = commitOnErrors;
+      this.instantTime = instantTime;
+      this.cfg = cfg;
+      this.errorTableWriter = errorTableWriter;
+      this.errorTableWriteStatusRDDOpt = errorTableWriteStatusRDDOpt;
+      this.errorWriteFailureStrategy = errorWriteFailureStrategy;
+      this.isErrorTableWriteUnificationEnabled = 
isErrorTableWriteUnificationEnabled;
+      this.errorTableInstantTime = errorTableInstantTime;
+      this.writeClient = writeClient;
+      this.latestCommittedInstant = latestCommittedInstant;
+      this.totalSuccessfulRecords = totalSuccessfulRecords;
+    }
+
+    @Override
+    public boolean validate(long tableTotalRecords, long 
tableTotalErroredRecords, Option<HoodieData<WriteStatus>> writeStatusesOpt) {
+
+      long totalRecords = tableTotalRecords;
+      long totalErroredRecords = tableTotalErroredRecords;
+      if (isErrorTableWriteUnificationEnabled) {
+        totalRecords += errorTableWriteStatusRDDOpt.map(status -> 
status.mapToDouble(WriteStatus::getTotalRecords).sum().longValue()).orElse(0L);
+        totalErroredRecords += errorTableWriteStatusRDDOpt.map(status -> 
status.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue()).orElse(0L);
+      }
+      long totalSuccessfulRecords = totalRecords - totalErroredRecords;
+      this.totalSuccessfulRecords.set(totalSuccessfulRecords);
+      LOG.info("instantTime={}, totalRecords={}, totalErrorRecords={}, 
totalSuccessfulRecords={}",
+          instantTime, totalRecords, totalErroredRecords, 
totalSuccessfulRecords);
+      if (totalRecords == 0) {
+        LOG.info("No new data, perform empty commit.");
+      }
+      boolean hasErrorRecords = totalErroredRecords > 0;
+      if (!hasErrorRecords || commitOnErrors) {
+        if (hasErrorRecords) {
+          LOG.warn("Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total="
+              + totalErroredRecords + "/" + totalRecords);
+        }
+      }
+
+      if (errorTableWriter.isPresent()) {
+        boolean errorTableSuccess = true;
+        // Commit the error events triggered so far to the error table
+        if (isErrorTableWriteUnificationEnabled && 
errorTableWriteStatusRDDOpt.isPresent()) {
+          errorTableSuccess = 
errorTableWriter.get().commit(errorTableInstantTime, 
errorTableWriteStatusRDDOpt.get());
+        } else if (!isErrorTableWriteUnificationEnabled) {
+          errorTableSuccess = 
errorTableWriter.get().upsertAndCommit(instantTime, latestCommittedInstant);
+        }
+        if (!errorTableSuccess) {
+          switch (errorWriteFailureStrategy) {
+            case ROLLBACK_COMMIT:
+              LOG.info("Commit " + instantTime + " failed!");
+              writeClient.rollback(instantTime);
+              throw new HoodieStreamerWriteException("Error table commit 
failed");
+            case LOG_ERROR:
+              LOG.error("Error Table write failed for instant " + instantTime);
+              break;
+            default:
+              throw new HoodieStreamerWriteException("Write failure strategy 
not implemented for " + errorWriteFailureStrategy);
+          }
+        }
+      }
+      boolean canProceed = !hasErrorRecords || commitOnErrors;
+      if (canProceed) {
+        return canProceed;
+      } else {
+        LOG.error("Delta Sync found errors when writing. Errors/Total=" + 
totalErroredRecords + "/" + totalRecords);
+        LOG.error("Printing out the top 100 errors");
+        ValidationUtils.checkArgument(writeStatusesOpt.isPresent(), "RDD 
<WriteStatus> is expected to be present when there are errors ");
+        
HoodieJavaRDD.getJavaRDD(writeStatusesOpt.get()).filter(WriteStatus::hasErrors).take(100).forEach(writeStatus
 ->  {
+          LOG.error("Global error " + writeStatus.getGlobalError());
+          if (!writeStatus.getErrors().isEmpty()) {
+            writeStatus.getErrors().forEach((k,v) -> {
+              LOG.trace("Error for key %s : %s ", k, v);
+            });
+          }
+        });
+        // Rolling back instant
+        writeClient.rollback(instantTime);
+        throw new HoodieStreamerWriteException("Commit " + instantTime + " 
failed and rolled-back !");
+      }
+    }
+  }
 }

Reply via email to