bvaradar commented on a change in pull request #3153:
URL: https://github.com/apache/hudi/pull/3153#discussion_r662650025



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -194,6 +194,37 @@
   public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = 
AVRO_SCHEMA + ".externalTransformation";
   public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION 
= "false";
 
+  public static final String PRE_COMMIT_VALIDATORS = 
"hoodie.precommit.validators";
+  private static final String DEFAULT_PRE_COMMIT_VALIDATORS = "";
+  public static final String VALIDATOR_TABLE_VARIABLE = "<TABLE_NAME>";

Review comment:
       It would make the validation queries more flexible if we can make both 
after and before table names individually configurable. Sometimes, your 
validation queries would involve joining both before and after tables. Keeping 
them configurable would allow for more flexibility.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Validator framework to run sql queries and compare table state at different 
locations.
+ */
+public abstract class SqlQueryPreCommitValidator<T extends 
HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends 
SparkPreCommitValidator<T, I, K, O> {
+  private static final Logger LOG = 
LogManager.getLogger(SqlQueryPreCommitValidator.class);
+  private static final AtomicInteger TABLE_COUNTER = new AtomicInteger(0);
+
+  public SqlQueryPreCommitValidator(HoodieSparkTable<T> table, 
HoodieEngineContext engineContext, HoodieWriteConfig config) {
+    super(table, engineContext, config);
+  }
+
+  /**
+   * Takes input of RDD 1) before clustering and 2) after clustering. Perform 
required validation 
+   * and throw error if validation fails
+   */
+  @Override
+  public void validateRecordsBeforeAndAfter(Dataset<Row> before, Dataset<Row> 
after, final Set<String> partitionsAffected) {
+    String hoodieTableName = "staged_table_" + TABLE_COUNTER.incrementAndGet();
+    String hoodieTableBeforeClustering = hoodieTableName + "_before";
+    String hoodieTableAfterClustering = hoodieTableName + "_after";

Review comment:
       Can you also take one pass at the code and rename variables. the 
validator needs to be agnostic to commit or clustering operations. Can you name 
them accordingly. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.validator.SparkPreCommitValidator;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import scala.collection.JavaConverters;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Spark validator utils to verify and run any precommit validators configured.
+ */
+public class SparkValidatorUtils {
+  private static final Logger LOG = 
LogManager.getLogger(BaseSparkCommitActionExecutor.class);
+
+  /**
+   * Check configured pre-commit validators and run them. Note that this only 
works for COW tables
+   * 
+   * Throw error if there are validation failures.
+   */
+  public static void runValidators(HoodieWriteConfig config,
+                                   HoodieWriteMetadata<JavaRDD<WriteStatus>> 
writeMetadata,
+                                   HoodieEngineContext context,
+                                   HoodieTable table,
+                                   String instantTime) {
+    if (StringUtils.isNullOrEmpty(config.getPreCommitValidators())) {
+      LOG.info("no validators configured.");
+    } else {
+      Set<String> partitionsModified;
+      if (writeMetadata.getWriteStats().isPresent()) {
+        partitionsModified = new 
HashSet<>(writeMetadata.getWriteStats().get().stream().map(writeStats ->
+            writeStats.getPartitionPath()).collect(Collectors.toList()));
+      } else {
+        partitionsModified = new 
HashSet<>(writeMetadata.getWriteStatuses().map(WriteStatus::getPartitionPath).collect());
+      }
+      SQLContext sqlContext = new 
SQLContext(HoodieSparkEngineContext.getSparkContext(context));
+      // Refresh timeline to ensure validator sees the any other operations 
done on timeline (async operations such as other clustering/compaction/rollback)
+      table.getMetaClient().reloadActiveTimeline();
+      Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, 
partitionsModified, table).cache();
+      Dataset<Row> afterState  = getRecordsFromPendingCommits(sqlContext, 
partitionsModified, writeMetadata.getPartitionToReplaceFileIds(), 
table).cache();
+
+      Stream<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
+          .map(validatorClass -> {
+            return ((SparkPreCommitValidator) 
ReflectionUtils.loadClass(validatorClass,
+                new Class<?>[] {HoodieSparkTable.class, 
HoodieEngineContext.class, HoodieWriteConfig.class},
+                table, context, config));
+          });
+
+      boolean allSuccess = validators.map(v -> runValidatorAsync(v, 
writeMetadata, beforeState, afterState, 
instantTime)).map(CompletableFuture::join)
+          .reduce(true, Boolean::logicalAnd);
+
+      if (allSuccess) {
+        LOG.info("All validations succeeded");
+      } else {
+        LOG.error("At least one pre-commit validation failed");
+        throw new HoodieValidationException("At least one pre-commit 
validation failed");
+      }
+    }
+  }
+
+  /**
+   * Run validators in a separate threadpool for parallelism. Each of 
validator can submit a distributed spark job if needed.
+   */
+  private static CompletableFuture<Boolean> 
runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata 
writeMetadata,
+                                                       Dataset<Row> 
beforeState, Dataset<Row> afterState, String instantTime) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        validator.validate(instantTime, writeMetadata, beforeState, 
afterState);
+        LOG.info("validation complete for " + validator.getClass().getName());
+        return true;
+      } catch (HoodieValidationException e) {
+        LOG.error("validation failed for " + validator.getClass().getName());
+        return false;
+      }
+    });
+  }
+
+  /**
+   * Get records from partitions modified as a dataset.
+   * Note that this only works for COW tables.
+   */
+  public static Dataset<Row> getRecordsFromCommittedFiles(SQLContext 
sqlContext,
+                                                      Set<String> 
partitionsAffected, HoodieTable table) {
+
+    List<String> committedFiles = partitionsAffected.stream()
+        .flatMap(partition -> 
table.getBaseFileOnlyView().getLatestBaseFiles(partition).map(bf -> 
bf.getPath()))
+        .collect(Collectors.toList());
+
+    if (committedFiles.isEmpty()) {
+      return sqlContext.emptyDataFrame();
+    }
+    return readRecordsForBaseFiles(sqlContext, committedFiles);
+  }
+
+  /**
+   * Get records from specified list of data files.
+   */
+  public static Dataset<Row> readRecordsForBaseFiles(SQLContext sqlContext, 
List<String> baseFilePaths) {
+    return 
sqlContext.read().parquet(JavaConverters.asScalaBufferConverter(baseFilePaths).asScala());
+  }
+
+  /**
+   * Get reads from paritions modified including any inflight commits.
+   * Note that this only works for COW tables
+   */
+  public static Dataset<Row> getRecordsFromPendingCommits(SQLContext 
sqlContext, 
+                                                          Set<String> 
partitionsAffected, 
+                                                          Map<String, 
List<String>> partitionToReplacedFileIds, 
+                                                          HoodieTable table) {
+
+    // build file system view with pending commits
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(table.getMetaClient(),
+        table.getActiveTimeline().getCommitsTimeline());
+
+    List<String> newFiles = partitionsAffected.stream().flatMap(partition -> {
+      return fsView.getLatestBaseFiles(partition)
+          .filter(bf -> !partitionToReplacedFileIds.getOrDefault(partition, 
Collections.emptyList()).contains(bf.getFileId()))

Review comment:
       Right. What I am proposing here is to have a class that inherits 
HoodieTableFileSystemView (something like HoodieTablePreCommitFileSystemView) 
which takes in as argument existing HoodieTableFileSystemView (committed data) 
and also uncommitted metadata (HoodieWriteMetadata).  This way of composing 
will also work even if multiple writes (inflights) are happening concurrently 
as HoodieTablePreCommitFileSystemView would automatically skip the inflight it 
sees from other parallel write. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to