This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b87e143cfe [HUDI-6115] Adding hardening checks for transformer output 
schema for quarantine enabled/disabled (#8520)
0b87e143cfe is described below

commit 0b87e143cfe237ddc005f610d208d1bf36432ba3
Author: harshal <[email protected]>
AuthorDate: Fri May 19 07:02:11 2023 +0530

    [HUDI-6115] Adding hardening checks for transformer output schema for 
quarantine enabled/disabled (#8520)
    
    - Adds ERROR_TABLE_CURRUPT_RECORD_COL_NAME as a null
       value column if the error table is enabled for transformers and the
       column does not exist in the dataset.
    - Adds validation for ERROR_TABLE_CURRUPT_RECORD_COL_NAME
       column to be part of the transformer in cases of error table is 
enabled/disabled.
---
 .../org/apache/hudi/utilities/UtilHelpers.java     |   7 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   4 +-
 .../utilities/deltastreamer/ErrorTableUtils.java   |  33 ++++-
 .../utilities/transform/ChainedTransformer.java    |   8 +-
 .../ErrorTableAwareChainedTransformer.java         |  58 ++++++++
 .../TestErrorTableAwareChainedTransformer.java     | 150 +++++++++++++++++++++
 6 files changed, 250 insertions(+), 10 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 721ba2eb9f4..16ed7eadc1f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -61,6 +61,7 @@ import 
org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor
 import org.apache.hudi.utilities.sources.Source;
 import 
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
 import 
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
+import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
 import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
@@ -190,9 +191,11 @@ public class UtilHelpers {
 
   }
 
-  public static Option<Transformer> createTransformer(Option<List<String>> 
classNamesOpt) throws IOException {
+  public static Option<Transformer> createTransformer(Option<List<String>> 
classNamesOpt, Boolean isErrorTableWriterEnabled) throws IOException {
     try {
-      return classNamesOpt.map(classNames -> classNames.isEmpty() ? null : new 
ChainedTransformer(classNames));
+      return classNamesOpt.map(classNames -> classNames.isEmpty() ? null : 
+          isErrorTableWriterEnabled ? new 
ErrorTableAwareChainedTransformer(classNames) : new 
ChainedTransformer(classNames)
+      );
     } catch (Throwable e) {
       throw new IOException("Could not load transformer class(es) " + 
classNamesOpt.get(), e);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 7d1d0758955..cbd19305e41 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -289,7 +289,6 @@ public class DeltaSync implements Serializable, Closeable {
     // Register User Provided schema first
     registerAvroSchemas(schemaProvider);
 
-    this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames));
 
     this.metrics = (HoodieIngestionMetrics) 
ReflectionUtils.loadClass(cfg.ingestionMetricsClass, 
getHoodieClientConfig(this.schemaProvider));
     this.hoodieMetrics = new 
HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
@@ -306,6 +305,9 @@ public class DeltaSync implements Serializable, Closeable {
     this.formatAdapter = new SourceFormatAdapter(
         UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider, metrics),
         this.errorTableWriter, Option.of(props));
+
+    this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames), 
this.errorTableWriter.isPresent());
+
   }
 
   /**
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
index 881a9545461..76e7b030b6f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
@@ -28,24 +28,29 @@ import org.apache.hudi.config.HoodieErrorTableConfig;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import static org.apache.spark.sql.functions.lit;
+
 import java.lang.reflect.InvocationTargetException;
 import java.util.Arrays;
 
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS;
 import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_FAILURE_STRATEGY;
+import static 
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
 
 public final class ErrorTableUtils {
-
   public static Option<BaseErrorTableWriter> 
getErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
-      TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+                                                                 
TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
     String errorTableWriterClass = 
props.getString(ERROR_TABLE_WRITE_CLASS.key());
     
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
         "Missing error table config " + ERROR_TABLE_WRITE_CLASS);
 
-    Class<?>[] argClassArr = new Class[] {HoodieDeltaStreamer.Config.class,
+    Class<?>[] argClassArr = new Class[]{HoodieDeltaStreamer.Config.class,
         SparkSession.class, TypedProperties.class, JavaSparkContext.class, 
FileSystem.class};
     String errMsg = "Unable to instantiate ErrorTableWriter with arguments 
type " + Arrays.toString(argClassArr);
     
ValidationUtils.checkArgument(ReflectionUtils.hasConstructor(BaseErrorTableWriter.class.getName(),
 argClassArr, false), errMsg);
@@ -63,4 +68,26 @@ public final class ErrorTableUtils {
     String writeFailureStrategy = 
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key());
     return 
HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy);
   }
+
+  /**
+   * validates for constraints on ErrorRecordColumn when ErrorTable enabled 
configs are set.
+   * @param dataset
+   */
+  public static void validate(Dataset<Row> dataset) {
+    if (!isErrorTableCorruptRecordColumnPresent(dataset)) {
+      throw new HoodieValidationException(String.format("Invalid condition, 
columnName=%s "
+              + "is not present in transformer " + "output schema", 
ERROR_TABLE_CURRUPT_RECORD_COL_NAME));
+    }
+  }
+
+  public static Dataset<Row> 
addNullValueErrorTableCorruptRecordColumn(Dataset<Row> dataset) {
+    if (!isErrorTableCorruptRecordColumnPresent(dataset)) {
+      dataset = dataset.withColumn(ERROR_TABLE_CURRUPT_RECORD_COL_NAME, 
lit(null));
+    }
+    return dataset;
+  }
+
+  private static boolean isErrorTableCorruptRecordColumnPresent(Dataset<Row> 
dataset) {
+    return Arrays.stream(dataset.columns()).anyMatch(col -> 
col.equals(ERROR_TABLE_CURRUPT_RECORD_COL_NAME));
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index 22100563204..cbc66ff4be7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -46,7 +46,7 @@ public class ChainedTransformer implements Transformer {
   // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
   private static final String ID_TRANSFORMER_CLASS_NAME_DELIMITER = ":";
 
-  private final List<TransformerInfo> transformers;
+  protected final List<TransformerInfo> transformers;
 
   public ChainedTransformer(List<Transformer> transformersList) {
     this.transformers = new ArrayList<>(transformersList.size());
@@ -109,7 +109,7 @@ public class ChainedTransformer implements Transformer {
     }
   }
 
-  private static class TransformerInfo {
+  protected static class TransformerInfo {
     private final Transformer transformer;
     private final Option<String> idOpt;
 
@@ -123,7 +123,7 @@ public class ChainedTransformer implements Transformer {
       this.idOpt = Option.empty();
     }
 
-    private Transformer getTransformer() {
+    protected Transformer getTransformer() {
       return transformer;
     }
 
@@ -131,7 +131,7 @@ public class ChainedTransformer implements Transformer {
       return idOpt.isPresent();
     }
 
-    private TypedProperties getProperties(TypedProperties properties) {
+    protected TypedProperties getProperties(TypedProperties properties) {
       TypedProperties transformerProps = properties;
       if (idOpt.isPresent()) {
         // Transformer specific property keys end with the id associated with 
the transformer.
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
new file mode 100644
index 00000000000..673be917c9e
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.deltastreamer.ErrorTableUtils;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.List;
+
+/**
+ * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
+ * Adds errorTableCorruptRecordColumn at the beginning of transformations and 
validates
+ * if that column is not dropped in any of the transformations.
+ */
+public class ErrorTableAwareChainedTransformer extends ChainedTransformer {
+  public ErrorTableAwareChainedTransformer(List<String> 
configuredTransformers, int... ignore) {
+    super(configuredTransformers);
+  }
+
+  public ErrorTableAwareChainedTransformer(List<Transformer> transformers) {
+    super(transformers);
+  }
+
+  @Override
+  public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset,
+                            TypedProperties properties) {
+    Dataset<Row> dataset = rowDataset;
+    dataset = 
ErrorTableUtils.addNullValueErrorTableCorruptRecordColumn(dataset);
+    for (TransformerInfo transformerInfo : transformers) {
+      Transformer transformer = transformerInfo.getTransformer();
+      dataset = transformer.apply(jsc, sparkSession, dataset, 
transformerInfo.getProperties(properties));
+      // validate in every stage to ensure it's not dropped by one of the 
transformer and added by next transformer.
+      ErrorTableUtils.validate(dataset);
+    }
+    return dataset;
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
new file mode 100644
index 00000000000..d86ce15113f
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
+import org.apache.hudi.utilities.transform.Transformer;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static 
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_ENABLED;
+import static 
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.createStructField;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+@Tag("functional")
+public class TestErrorTableAwareChainedTransformer extends 
SparkClientFunctionalTestHarness {
+
+  @Test
+  public void testForErrorTableConfig() {
+    Dataset<Row> original = getTestDataset();
+
+    Transformer t1 = getErrorEventHandlerTransformer();
+    Transformer t2 = (jsc, sparkSession, dataset, properties) -> 
dataset.withColumn("foo", dataset.col("foo").cast(IntegerType));
+    ErrorTableAwareChainedTransformer transformer = new 
ErrorTableAwareChainedTransformer(Arrays.asList(t1, t2));
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(ERROR_TABLE_ENABLED.key(), "true");
+    Dataset<Row> transformed = transformer.apply(jsc(), spark(), original, 
properties);
+    List<Row> rows = transformed.collectAsList();
+    assertArrayEquals(new String[]{"foo", 
ERROR_TABLE_CURRUPT_RECORD_COL_NAME}, transformed.columns());
+    assertEquals(2, transformed.filter(new 
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)
+        .isNotNull()).count());
+    assertEquals(100, rows.get(0).getInt(0));
+    assertEquals(200, rows.get(1).getInt(0));
+
+  }
+
+  @Test
+  public void testForErrorRecordColumn() {
+    Dataset<Row> original = getTestDataset();
+
+    Transformer t1 = getErrorEventHandlerTransformer();
+    Transformer t2 = getErrorRecordColumnDropTransformer();
+    Transformer t3 = (jsc, sparkSession, dataset, properties) -> 
dataset.withColumn("foo",
+        dataset.col("foo").cast(IntegerType));
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(ERROR_TABLE_ENABLED.key(), "true");
+    ErrorTableAwareChainedTransformer transformer = new 
ErrorTableAwareChainedTransformer(Arrays.asList(t1, t2, t3));
+    assertThrows(HoodieValidationException.class, () -> 
transformer.apply(jsc(), spark(), original, properties));
+  }
+
+  private Dataset<Row> getTestDataset() {
+    StructType schema = DataTypes.createStructType(
+        new StructField[]{
+            createStructField("foo", StringType, false)
+        });
+    Row r1 = RowFactory.create("100");
+    Row r2 = RowFactory.create("200");
+    return spark().sqlContext().createDataFrame(Arrays.asList(r1, r2), schema);
+  }
+
+  private Transformer getErrorEventHandlerTransformer() {
+    return (jsc, sparkSession, dataset, properties) -> {
+      boolean isErrorTableEnabledInTransformer = 
properties.getBoolean(ERROR_TABLE_ENABLED.key(), 
ERROR_TABLE_ENABLED.defaultValue());
+      if (isErrorTableEnabledInTransformer) {
+        dataset = dataset.withColumn(ERROR_TABLE_CURRUPT_RECORD_COL_NAME, 
functions.when(new Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)
+                .isNull(),
+            functions.lit("true")
+        ).otherwise(new Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)));
+      }
+      return dataset;
+    };
+  }
+
+  private Transformer getErrorRecordColumnDropTransformer() {
+    return (jsc, sparkSession, dataset, properties) -> dataset.select("foo");
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {
+      // empty identifier
+      
":org.apache.hudi.utilities.transform.FlatteningTransformer,T2:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      // same identifier
+      
"T1:org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      // Two colons in transformer config
+      "T1::org.apache.hudi.utilities.transform.FlatteningTransformer",
+      // either all transformers have identifier or none have
+      
"org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer"
+  })
+  public void testErrorTableAwareChainedTransformerValidationFails(String 
transformerName) {
+    try {
+      ErrorTableAwareChainedTransformer transformer = new 
ErrorTableAwareChainedTransformer(Arrays.asList(transformerName.split(",")));
+      fail();
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalArgumentException, e.getMessage());
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {
+      
"T1:org.apache.hudi.utilities.transform.FlatteningTransformer,T2:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      
"T2:org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      
"abc:org.apache.hudi.utilities.transform.FlatteningTransformer,def:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      
"org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.FlatteningTransformer"
+  })
+  public void testErrorTableAwareChainedTransformerValidationPasses(String 
transformerName) {
+    ErrorTableAwareChainedTransformer transformer = new 
ErrorTableAwareChainedTransformer(Arrays.asList(transformerName.split(",")));
+    assertNotNull(transformer);
+  }
+}

Reply via email to