shangxinli commented on code in PR #18765:
URL: https://github.com/apache/hudi/pull/18765#discussion_r3300053599


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkWriteErrorValidator.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import 
org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/**
+ * Pre-commit validator that fails the commit when records failed to write.
+ *
+ * <p>Equivalent of the legacy {@code HoodieStreamerWriteStatusValidator}'s 
boolean error check
+ * ({@code hasErrorRecords = totalErrorRecords > 0}), wired through the 
pre-commit validator
+ * framework (issue #18750). Pure validation: no side effects (no error-table 
commit, no
+ * top-100 error logging, no instant rollback). Those side effects are handled 
separately by
+ * {@code StreamSync}'s pre-commit orchestration.</p>
+ *
+ * <p><b>Relationship with the inline write-error gate in {@code 
StreamSync}:</b> the default
+ * commit path in {@code StreamSync} already applies an equivalent error check 
via the
+ * {@code commitOnErrors} flag. This validator exists so that users running 
multiple validators
+ * (e.g. write-error + offset checks) can express a unified pass/fail story 
through a single
+ * {@code failure.policy} knob. Enabling this validator while leaving {@code 
commitOnErrors=false}
+ * means both checks run and either can block the commit — they are 
intentionally not mutually
+ * exclusive.</p>
+ *
+ * <p>Behavior mapping from the legacy HSWSV (data-table only — see caveat 
below):</p>
+ * <ul>
+ *   <li>{@code commitOnErrors = false} (HSWSV default) ↔ {@code 
failure.policy = FAIL}</li>
+ *   <li>{@code commitOnErrors = true} ↔ {@code failure.policy = WARN_LOG}</li>
+ * </ul>
+ *
+ * <p><b>Unification caveat:</b> when {@code 
hoodie.errortable.write.unification.enabled=true},
+ * HSWSV's error check summed errors across <em>both</em> the data-table and 
the error-table write

Review Comment:
   Good point. The Javadoc already states both gates are intentionally 
independent, but the error message said 'or' which implied they're 
interchangeable — fixed to say both are required.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SuccessfulRecordCounter.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Computes record counts for a HoodieStreamer commit, summing across the 
data-table
+ * write statuses and (optionally) the error-table write statuses when 
error-table
+ * write unification is enabled.
+ *
+ * <p>Extracted from {@code HoodieStreamerWriteStatusValidator} (issue #18750) 
so the
+ * counting logic can be invoked from the explicit pre-commit orchestration in
+ * {@code StreamSync} without going through the {@code WriteStatusValidator} 
callback.</p>
+ */
+public final class SuccessfulRecordCounter {
+
+  private SuccessfulRecordCounter() {
+  }
+
+  /**
+   * Compute total / errored / successful record counts from a pre-collected 
list of write statuses.
+   *
+   * @param dataTableWriteStatuses           Pre-collected data-table write 
statuses. Must not be null.
+   * @param errorTableWriteStatusRDDOpt      Optional error-table write status 
RDD; only consulted
+   *                                         when unification is enabled. Must 
not be null
+   *                                         ({@link Option#empty()} when no 
error table).
+   * @param isErrorTableWriteUnificationEnabled Whether error-table records 
contribute to the totals.
+   * @return immutable {@link Counts} snapshot.
+   */
+  public static Counts compute(List<WriteStatus> dataTableWriteStatuses,
+                               Option<JavaRDD<WriteStatus>> 
errorTableWriteStatusRDDOpt,
+                               boolean isErrorTableWriteUnificationEnabled) {
+    Objects.requireNonNull(dataTableWriteStatuses, "dataTableWriteStatuses");
+    Objects.requireNonNull(errorTableWriteStatusRDDOpt, 
"errorTableWriteStatusRDDOpt");
+
+    long totalRecords = 0L;
+    long totalErrorRecords = 0L;
+    for (WriteStatus ws : dataTableWriteStatuses) {
+      totalRecords += ws.getTotalRecords();
+      totalErrorRecords += ws.getTotalErrorRecords();
+    }
+    if (isErrorTableWriteUnificationEnabled && 
errorTableWriteStatusRDDOpt.isPresent()) {
+      JavaRDD<WriteStatus> errorRdd = errorTableWriteStatusRDDOpt.get();
+      totalRecords += sumLong(errorRdd, WriteStatus::getTotalRecords);
+      totalErrorRecords += sumLong(errorRdd, 
WriteStatus::getTotalErrorRecords);

Review Comment:
   Agreed. Filed apache/hudi#18841 to track registering it as default in the 
next release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to