codope commented on code in PR #18405: URL: https://github.com/apache/hudi/pull/18405#discussion_r3014342428
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.validator.BasePreCommitValidator; +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility for running pre-commit validators in the HoodieStreamer commit flow. + * + * <p>Instantiates and executes validators configured via + * {@code hoodie.precommit.validators}. Each validator must extend + * {@link BasePreCommitValidator} and have a constructor that accepts + * {@link TypedProperties}.</p> + * + * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before + * the commit is finalized.</p> + */ +public class SparkStreamerValidatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class); + + /** + * Run all configured pre-commit validators. + * + * @param props Configuration properties containing validator class names + * @param instant Commit instant time + * @param writeStatusRDD Write statuses from Spark write operations + * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info) + * @param metaClient Table meta client for timeline access and previous commit lookup + * @throws HoodieValidationException if any validator fails with FAIL policy + */ + public static void runValidators(TypedProperties props, + String instant, + JavaRDD<WriteStatus> writeStatusRDD, + Map<String, String> checkpointCommitMetadata, + HoodieTableMetaClient metaClient) { + String validatorClassNames = props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()); Review Comment: There is an existing [SparkValidatorUtils](https://github.com/apache/hudi/blob/bef0c54ac5b4f1c88cca0bffcc8fb2be8ad33673/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java#L86) that reads from the same config key. Is it possible that when a user configures: `hoodie.precommit.validators=org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator`, there is a collision? IIRC, the write operation itself (e.g., writeClient.insert()) goes through BaseSparkCommitActionExecutor --> updateIndexAndMaybeRunPreCommitValidations() --> runPrecommitValidators() --> SparkValidatorUtils.runValidators(), which does: ``` // SparkValidatorUtils.java line 86-89 Stream<SparkPreCommitValidator> validators = Arrays.stream(config.getPreCommitValidators().split(",")) .map(validatorClass -> ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass, new Class<?>[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))); ``` This could fail because: 1. SparkKafkaOffsetValidator doesn't have a (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig) ctor leading to reflection error. 2. SparkKafkaOffsetValidator extends BasePreCommitValidator, NOT SparkPreCommitValidator leading to ClassCastException. If so, we could either use a separate config e.g., hoodie.precommit.streaming.validators for the new class or make SparkKafkaOffsetValidator also extend SparkPreCommitValidator with a compatible ctor. ########## bootstrap_register_only_issue.md: ########## Review Comment: Looks like unrelated file checkin.. maybe remove this? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.validator.BasePreCommitValidator; +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility for running pre-commit validators in the HoodieStreamer commit flow. + * + * <p>Instantiates and executes validators configured via + * {@code hoodie.precommit.validators}. Each validator must extend + * {@link BasePreCommitValidator} and have a constructor that accepts + * {@link TypedProperties}.</p> + * + * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before + * the commit is finalized.</p> + */ +public class SparkStreamerValidatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class); + + /** + * Run all configured pre-commit validators. + * + * @param props Configuration properties containing validator class names + * @param instant Commit instant time + * @param writeStatusRDD Write statuses from Spark write operations + * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info) + * @param metaClient Table meta client for timeline access and previous commit lookup + * @throws HoodieValidationException if any validator fails with FAIL policy + */ + public static void runValidators(TypedProperties props, + String instant, + JavaRDD<WriteStatus> writeStatusRDD, + Map<String, String> checkpointCommitMetadata, + HoodieTableMetaClient metaClient) { + String validatorClassNames = props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()); + + if (StringUtils.isNullOrEmpty(validatorClassNames)) { + return; + } + + // Collect write statuses and build context + List<WriteStatus> allWriteStatus = writeStatusRDD.collect(); Review Comment: Is the RDD cached? If not, then this can force second evaluation and can potentially cause driver OOM? ########## hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link SparkStreamerValidatorUtils}. + * + * <p>Uses a lightweight Spark context for JavaRDD creation. Tests validate the orchestration + * logic (class loading, config passing, error handling) using first-commit scenarios + * (no previous commit on timeline) to avoid needing a full HoodieTable setup.</p> + */ +public class TestSparkStreamerValidatorUtils { + + private static JavaSparkContext jsc; + + @TempDir + Path tempDir; + + @BeforeAll + public static void setUp() { + SparkConf conf = new SparkConf() + .setAppName("TestSparkStreamerValidatorUtils") + .setMaster("local[2]") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"); + jsc = new JavaSparkContext(conf); + } + + @AfterAll + public static void tearDown() { + if (jsc != null) { + jsc.stop(); + jsc = null; + } + } + + private static TypedProperties propsWithValidator(String validatorClassName) { + TypedProperties props = new TypedProperties(); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), validatorClassName); + props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(), "0.0"); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), "FAIL"); + return props; + } + + private static WriteStatus buildWriteStatus(String partitionPath, long numInserts, long numUpdates) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setPartitionPath(partitionPath); + stat.setNumInserts(numInserts); + stat.setNumUpdateWrites(numUpdates); + + WriteStatus ws = new WriteStatus(false, 0.0); + ws.setStat(stat); + return ws; + } + + private JavaRDD<WriteStatus> toRDD(List<WriteStatus> writeStatuses) { + return jsc.parallelize(writeStatuses); + } + + private org.apache.hudi.common.table.HoodieTableMetaClient createMetaClient() throws IOException { + return org.apache.hudi.common.testutils.HoodieTestUtils.init( + tempDir.toAbsolutePath().toString()); + } + + // ========== Tests ========== Review Comment: All the tests here use first-commit scenarios (empty timeline) where the validator would skip validation. An actual HoodieStreamer pipeline with two commits, where the second commit triggers real offset comparison would test e2e path better. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.validator.BasePreCommitValidator; +import org.apache.hudi.client.validator.ValidationContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility for running pre-commit validators in the HoodieStreamer commit flow. + * + * <p>Instantiates and executes validators configured via + * {@code hoodie.precommit.validators}. Each validator must extend + * {@link BasePreCommitValidator} and have a constructor that accepts + * {@link TypedProperties}.</p> + * + * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before + * the commit is finalized.</p> + */ +public class SparkStreamerValidatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SparkStreamerValidatorUtils.class); + + /** + * Run all configured pre-commit validators. + * + * @param props Configuration properties containing validator class names + * @param instant Commit instant time + * @param writeStatusRDD Write statuses from Spark write operations + * @param checkpointCommitMetadata Extra metadata being committed (contains checkpoint info) + * @param metaClient Table meta client for timeline access and previous commit lookup + * @throws HoodieValidationException if any validator fails with FAIL policy + */ + public static void runValidators(TypedProperties props, + String instant, + JavaRDD<WriteStatus> writeStatusRDD, + Map<String, String> checkpointCommitMetadata, + HoodieTableMetaClient metaClient) { + String validatorClassNames = props.getString( + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), + HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()); + + if (StringUtils.isNullOrEmpty(validatorClassNames)) { + return; + } + + // Collect write statuses and build context + List<WriteStatus> allWriteStatus = writeStatusRDD.collect(); + HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, checkpointCommitMetadata); + List<HoodieWriteStat> writeStats = allWriteStatus.stream() + .map(WriteStatus::getStat) + .collect(Collectors.toList()); + + // Load previous commit metadata from timeline + Option<HoodieCommitMetadata> previousCommitMetadata = loadPreviousCommitMetadata(metaClient); + + ValidationContext context = new SparkValidationContext( + instant, + Option.of(currentMetadata), + Option.of(writeStats), + previousCommitMetadata, + metaClient); + + // Instantiate and run each validator + List<String> classNames = Arrays.stream(validatorClassNames.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + for (String className : classNames) { + try { + BasePreCommitValidator validator = (BasePreCommitValidator) + ReflectionUtils.loadClass(className, new Class<?>[] {TypedProperties.class}, props); + LOG.info("Running pre-commit validator: {} for instant: {}", className, instant); + validator.validateWithMetadata(context); + LOG.info("Pre-commit validator {} passed for instant: {}", className, instant); + } catch (HoodieValidationException e) { + LOG.error("Pre-commit validator {} failed for instant: {}", className, instant, e); + throw e; + } catch (Exception e) { + LOG.error("Failed to instantiate or run validator: {}", className, e); + throw new HoodieValidationException( + "Failed to run pre-commit validator: " + className, e); + } + } + } + + /** + * Build HoodieCommitMetadata from write statuses and extra metadata. + * This constructs the metadata object that would be committed, giving + * validators access to the same data. + */ + private static HoodieCommitMetadata buildCommitMetadata( Review Comment: There is already a util `CommitUtils.buildMetadata()` that can be reused here. Also, this new method reconstructs a partial `HoodieCommitMetadata`, which could mislead future validators that expect complete metadata. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java: ########## @@ -872,6 +873,10 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Hood totalSuccessfulRecords); String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); + // Run pre-commit streaming offset validators (if configured) before commit + SparkStreamerValidatorUtils.runValidators(props, instantTime, writeStatusRDD, + checkpointCommitMetadata, metaClient); Review Comment: validator runs before the success/error is handled below. This obscures the actual failure mode and the existing `commitOnErrors logic` never gets a chance to handle it. Why not do it after? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
