This is an automated email from the ASF dual-hosted git repository.

nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7af8cdf2c20e perf(streamer): fold validate() error-table WriteStatus 
sums into one pass (#18871)
7af8cdf2c20e is described below

commit 7af8cdf2c20e8ac5c57adaaad9fe4279974a5877
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Thu May 28 12:06:10 2026 -0700

    perf(streamer): fold validate() error-table WriteStatus sums into one pass 
(#18871)
    
    * perf(streamer): fold validate() error-table WriteStatus sums into one pass
---
 .../apache/hudi/utilities/streamer/StreamSync.java | 40 ++++++++++-
 .../TestStreamSyncWriteStatusValidation.java       | 83 ++++++++++++++++++++++
 2 files changed, 120 insertions(+), 3 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 2b1406c7deab..edeafb2755e3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -1403,6 +1403,39 @@ public class StreamSync implements Serializable, 
Closeable {
     }
   }
 
+  /**
+   * Sums {@link WriteStatus#getTotalRecords()} and {@link 
WriteStatus#getTotalErrorRecords()} over the
+   * given RDD in a single Spark action, returned as a {@code (totalRecords, 
totalErroredRecords)} tuple.
+   *
+   * <p>Folding both counters into one {@code aggregate} pass avoids 
re-deserializing every cached
+   * {@link WriteStatus} block a second time. Issuing two separate {@code 
mapToDouble(...).sum()}
+   * actions on the persisted error-table {@code WriteStatus} RDD doubles Kryo 
deserialization of
+   * cached partitions during commit validation and adds gratuitous heap 
pressure on memory-strained
+   * executors.
+   *
+   * <p>{@code aggregate} is used (instead of {@code 
mapPartitions(...).reduce(...)}) so that a
+   * 0-partition RDD (e.g. {@code sc.emptyRDD()}, which {@link 
BaseErrorTableWriter#upsert} can
+   * return for an empty commit) returns {@code (0L, 0L)} rather than raising
+   * {@code UnsupportedOperationException} as {@code reduce} would. The 
mutable {@code long[2]}
+   * accumulator keeps per-record allocations at zero.
+   */
+  @VisibleForTesting
+  static Tuple2<Long, Long> sumRecordAndErrorCounts(JavaRDD<WriteStatus> 
writeStatuses) {
+    long[] counts = writeStatuses.aggregate(
+        new long[]{0L, 0L},
+        (acc, status) -> {
+          acc[0] += status.getTotalRecords();
+          acc[1] += status.getTotalErrorRecords();
+          return acc;
+        },
+        (left, right) -> {
+          left[0] += right[0];
+          left[1] += right[1];
+          return left;
+        });
+    return new Tuple2<>(counts[0], counts[1]);
+  }
+
   /**
    * WriteStatus Validator for commits to hoodie streamer data table.
    * The writes to error table is taken care as well.
@@ -1447,9 +1480,10 @@ public class StreamSync implements Serializable, 
Closeable {
 
       long totalRecords = tableTotalRecords;
       long totalErroredRecords = tableTotalErroredRecords;
-      if (isErrorTableWriteUnificationEnabled) {
-        totalRecords += errorTableWriteStatusRDDOpt.map(status -> 
status.mapToDouble(WriteStatus::getTotalRecords).sum().longValue()).orElse(0L);
-        totalErroredRecords += errorTableWriteStatusRDDOpt.map(status -> 
status.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue()).orElse(0L);
+      if (isErrorTableWriteUnificationEnabled && 
errorTableWriteStatusRDDOpt.isPresent()) {
+        Tuple2<Long, Long> errorTableCounts = 
sumRecordAndErrorCounts(errorTableWriteStatusRDDOpt.get());
+        totalRecords += errorTableCounts._1;
+        totalErroredRecords += errorTableCounts._2;
       }
       long totalSuccessfulRecords = totalRecords - totalErroredRecords;
       this.totalSuccessfulRecords.set(totalSuccessfulRecords);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncWriteStatusValidation.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncWriteStatusValidation.java
new file mode 100644
index 000000000000..793cfd5de8ba
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncWriteStatusValidation.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link StreamSync#sumRecordAndErrorCounts(JavaRDD)}, the 
single-pass fold that replaces two
+ * separate {@code mapToDouble(...).sum()} actions in error-table commit 
validation.
+ */
+class TestStreamSyncWriteStatusValidation extends 
SparkClientFunctionalTestHarness {
+
+  @Test
+  void sumsTotalAndErroredRecordsAcrossPartitions() {
+    List<WriteStatus> writeStatuses = new ArrayList<>();
+    writeStatuses.add(writeStatus(10L, 3L));
+    writeStatuses.add(writeStatus(20L, 5L));
+    writeStatuses.add(writeStatus(7L, 0L));
+    // More partitions than elements forces an empty partition, exercising the 
aggregate fold.
+    JavaRDD<WriteStatus> writeStatusRDD = jsc().parallelize(writeStatuses, 4);
+
+    Tuple2<Long, Long> counts = 
StreamSync.sumRecordAndErrorCounts(writeStatusRDD);
+
+    assertEquals(37L, counts._1, "total records summed across all partitions");
+    assertEquals(8L, counts._2, "total errored records summed across all 
partitions");
+  }
+
+  @Test
+  void sumsToZeroWhenNoWriteStatusesPresent() {
+    JavaRDD<WriteStatus> emptyRDD = jsc().parallelize(new 
ArrayList<WriteStatus>(), 2);
+
+    Tuple2<Long, Long> counts = StreamSync.sumRecordAndErrorCounts(emptyRDD);
+
+    assertEquals(0L, counts._1);
+    assertEquals(0L, counts._2);
+  }
+
+  @Test
+  void sumsToZeroForZeroPartitionRDD() {
+    // BaseErrorTableWriter.upsert can return sc.emptyRDD() on an empty 
commit; JavaRDD.reduce throws
+    // UnsupportedOperationException on a 0-partition RDD, whereas aggregate 
returns the zero value.
+    JavaRDD<WriteStatus> emptyRDD = jsc().emptyRDD();
+
+    Tuple2<Long, Long> counts = StreamSync.sumRecordAndErrorCounts(emptyRDD);
+
+    assertEquals(0L, counts._1);
+    assertEquals(0L, counts._2);
+  }
+
+  private static WriteStatus writeStatus(long totalRecords, long 
totalErrorRecords) {
+    WriteStatus writeStatus = new WriteStatus();
+    writeStatus.setTotalRecords(totalRecords);
+    writeStatus.setTotalErrorRecords(totalErrorRecords);
+    return writeStatus;
+  }
+}

Reply via email to