This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0b87e143cfe [HUDI-6115] Adding hardening checks for transformer output
schema for quarantine enabled/disabled (#8520)
0b87e143cfe is described below
commit 0b87e143cfe237ddc005f610d208d1bf36432ba3
Author: harshal <[email protected]>
AuthorDate: Fri May 19 07:02:11 2023 +0530
[HUDI-6115] Adding hardening checks for transformer output schema for
quarantine enabled/disabled (#8520)
- Adds ERROR_TABLE_CURRUPT_RECORD_COL_NAME as a null
value column if the error table is enabled for transformers and the
column does not exist in the dataset.
- Adds validation for ERROR_TABLE_CURRUPT_RECORD_COL_NAME
column to be part of the transformer in cases of error table is
enabled/disabled.
---
.../org/apache/hudi/utilities/UtilHelpers.java | 7 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 4 +-
.../utilities/deltastreamer/ErrorTableUtils.java | 33 ++++-
.../utilities/transform/ChainedTransformer.java | 8 +-
.../ErrorTableAwareChainedTransformer.java | 58 ++++++++
.../TestErrorTableAwareChainedTransformer.java | 150 +++++++++++++++++++++
6 files changed, 250 insertions(+), 10 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 721ba2eb9f4..16ed7eadc1f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -61,6 +61,7 @@ import
org.apache.hudi.utilities.schema.postprocessor.ChainedSchemaPostProcessor
import org.apache.hudi.utilities.sources.Source;
import
org.apache.hudi.utilities.sources.processor.ChainedJsonKafkaSourcePostProcessor;
import
org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
+import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
@@ -190,9 +191,11 @@ public class UtilHelpers {
}
- public static Option<Transformer> createTransformer(Option<List<String>>
classNamesOpt) throws IOException {
+ public static Option<Transformer> createTransformer(Option<List<String>>
classNamesOpt, Boolean isErrorTableWriterEnabled) throws IOException {
try {
- return classNamesOpt.map(classNames -> classNames.isEmpty() ? null : new
ChainedTransformer(classNames));
+ return classNamesOpt.map(classNames -> classNames.isEmpty() ? null :
+ isErrorTableWriterEnabled ? new
ErrorTableAwareChainedTransformer(classNames) : new
ChainedTransformer(classNames)
+ );
} catch (Throwable e) {
throw new IOException("Could not load transformer class(es) " +
classNamesOpt.get(), e);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 7d1d0758955..cbd19305e41 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -289,7 +289,6 @@ public class DeltaSync implements Serializable, Closeable {
// Register User Provided schema first
registerAvroSchemas(schemaProvider);
- this.transformer =
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames));
this.metrics = (HoodieIngestionMetrics)
ReflectionUtils.loadClass(cfg.ingestionMetricsClass,
getHoodieClientConfig(this.schemaProvider));
this.hoodieMetrics = new
HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
@@ -306,6 +305,9 @@ public class DeltaSync implements Serializable, Closeable {
this.formatAdapter = new SourceFormatAdapter(
UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider, metrics),
this.errorTableWriter, Option.of(props));
+
+ this.transformer =
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
this.errorTableWriter.isPresent());
+
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
index 881a9545461..76e7b030b6f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorTableUtils.java
@@ -28,24 +28,29 @@ import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import static org.apache.spark.sql.functions.lit;
+
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_CLASS;
import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_WRITE_FAILURE_STRATEGY;
+import static
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
public final class ErrorTableUtils {
-
public static Option<BaseErrorTableWriter>
getErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
- TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+
TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
String errorTableWriterClass =
props.getString(ERROR_TABLE_WRITE_CLASS.key());
ValidationUtils.checkState(!StringUtils.isNullOrEmpty(errorTableWriterClass),
"Missing error table config " + ERROR_TABLE_WRITE_CLASS);
- Class<?>[] argClassArr = new Class[] {HoodieDeltaStreamer.Config.class,
+ Class<?>[] argClassArr = new Class[]{HoodieDeltaStreamer.Config.class,
SparkSession.class, TypedProperties.class, JavaSparkContext.class,
FileSystem.class};
String errMsg = "Unable to instantiate ErrorTableWriter with arguments
type " + Arrays.toString(argClassArr);
ValidationUtils.checkArgument(ReflectionUtils.hasConstructor(BaseErrorTableWriter.class.getName(),
argClassArr, false), errMsg);
@@ -63,4 +68,26 @@ public final class ErrorTableUtils {
String writeFailureStrategy =
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key());
return
HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy);
}
+
+ /**
+ * validates for constraints on ErrorRecordColumn when ErrorTable enabled
configs are set.
+ * @param dataset
+ */
+ public static void validate(Dataset<Row> dataset) {
+ if (!isErrorTableCorruptRecordColumnPresent(dataset)) {
+ throw new HoodieValidationException(String.format("Invalid condition,
columnName=%s "
+ + "is not present in transformer " + "output schema",
ERROR_TABLE_CURRUPT_RECORD_COL_NAME));
+ }
+ }
+
+ public static Dataset<Row>
addNullValueErrorTableCorruptRecordColumn(Dataset<Row> dataset) {
+ if (!isErrorTableCorruptRecordColumnPresent(dataset)) {
+ dataset = dataset.withColumn(ERROR_TABLE_CURRUPT_RECORD_COL_NAME,
lit(null));
+ }
+ return dataset;
+ }
+
+ private static boolean isErrorTableCorruptRecordColumnPresent(Dataset<Row>
dataset) {
+ return Arrays.stream(dataset.columns()).anyMatch(col ->
col.equals(ERROR_TABLE_CURRUPT_RECORD_COL_NAME));
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index 22100563204..cbc66ff4be7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -46,7 +46,7 @@ public class ChainedTransformer implements Transformer {
// Delimiter used to separate class name and the property key suffix. The
suffix comes first.
private static final String ID_TRANSFORMER_CLASS_NAME_DELIMITER = ":";
- private final List<TransformerInfo> transformers;
+ protected final List<TransformerInfo> transformers;
public ChainedTransformer(List<Transformer> transformersList) {
this.transformers = new ArrayList<>(transformersList.size());
@@ -109,7 +109,7 @@ public class ChainedTransformer implements Transformer {
}
}
- private static class TransformerInfo {
+ protected static class TransformerInfo {
private final Transformer transformer;
private final Option<String> idOpt;
@@ -123,7 +123,7 @@ public class ChainedTransformer implements Transformer {
this.idOpt = Option.empty();
}
- private Transformer getTransformer() {
+ protected Transformer getTransformer() {
return transformer;
}
@@ -131,7 +131,7 @@ public class ChainedTransformer implements Transformer {
return idOpt.isPresent();
}
- private TypedProperties getProperties(TypedProperties properties) {
+ protected TypedProperties getProperties(TypedProperties properties) {
TypedProperties transformerProps = properties;
if (idOpt.isPresent()) {
// Transformer specific property keys end with the id associated with
the transformer.
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
new file mode 100644
index 00000000000..673be917c9e
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.deltastreamer.ErrorTableUtils;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.List;
+
+/**
+ * A {@link Transformer} to chain other {@link Transformer}s and apply
sequentially.
+ * Adds errorTableCorruptRecordColumn at the beginning of transformations and
validates
+ * if that column is not dropped in any of the transformations.
+ */
+public class ErrorTableAwareChainedTransformer extends ChainedTransformer {
+ public ErrorTableAwareChainedTransformer(List<String>
configuredTransformers, int... ignore) {
+ super(configuredTransformers);
+ }
+
+ public ErrorTableAwareChainedTransformer(List<Transformer> transformers) {
+ super(transformers);
+ }
+
+ @Override
+ public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset,
+ TypedProperties properties) {
+ Dataset<Row> dataset = rowDataset;
+ dataset =
ErrorTableUtils.addNullValueErrorTableCorruptRecordColumn(dataset);
+ for (TransformerInfo transformerInfo : transformers) {
+ Transformer transformer = transformerInfo.getTransformer();
+ dataset = transformer.apply(jsc, sparkSession, dataset,
transformerInfo.getProperties(properties));
+ // validate in every stage to ensure it's not dropped by one of the
transformer and added by next transformer.
+ ErrorTableUtils.validate(dataset);
+ }
+ return dataset;
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
new file mode 100644
index 00000000000..d86ce15113f
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestErrorTableAwareChainedTransformer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
+import org.apache.hudi.utilities.transform.Transformer;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static
org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_ENABLED;
+import static
org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.createStructField;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+@Tag("functional")
+public class TestErrorTableAwareChainedTransformer extends
SparkClientFunctionalTestHarness {
+
+ @Test
+ public void testForErrorTableConfig() {
+ Dataset<Row> original = getTestDataset();
+
+ Transformer t1 = getErrorEventHandlerTransformer();
+ Transformer t2 = (jsc, sparkSession, dataset, properties) ->
dataset.withColumn("foo", dataset.col("foo").cast(IntegerType));
+ ErrorTableAwareChainedTransformer transformer = new
ErrorTableAwareChainedTransformer(Arrays.asList(t1, t2));
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(ERROR_TABLE_ENABLED.key(), "true");
+ Dataset<Row> transformed = transformer.apply(jsc(), spark(), original,
properties);
+ List<Row> rows = transformed.collectAsList();
+ assertArrayEquals(new String[]{"foo",
ERROR_TABLE_CURRUPT_RECORD_COL_NAME}, transformed.columns());
+ assertEquals(2, transformed.filter(new
Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)
+ .isNotNull()).count());
+ assertEquals(100, rows.get(0).getInt(0));
+ assertEquals(200, rows.get(1).getInt(0));
+
+ }
+
+ @Test
+ public void testForErrorRecordColumn() {
+ Dataset<Row> original = getTestDataset();
+
+ Transformer t1 = getErrorEventHandlerTransformer();
+ Transformer t2 = getErrorRecordColumnDropTransformer();
+ Transformer t3 = (jsc, sparkSession, dataset, properties) ->
dataset.withColumn("foo",
+ dataset.col("foo").cast(IntegerType));
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(ERROR_TABLE_ENABLED.key(), "true");
+ ErrorTableAwareChainedTransformer transformer = new
ErrorTableAwareChainedTransformer(Arrays.asList(t1, t2, t3));
+ assertThrows(HoodieValidationException.class, () ->
transformer.apply(jsc(), spark(), original, properties));
+ }
+
+ private Dataset<Row> getTestDataset() {
+ StructType schema = DataTypes.createStructType(
+ new StructField[]{
+ createStructField("foo", StringType, false)
+ });
+ Row r1 = RowFactory.create("100");
+ Row r2 = RowFactory.create("200");
+ return spark().sqlContext().createDataFrame(Arrays.asList(r1, r2), schema);
+ }
+
+ private Transformer getErrorEventHandlerTransformer() {
+ return (jsc, sparkSession, dataset, properties) -> {
+ boolean isErrorTableEnabledInTransformer =
properties.getBoolean(ERROR_TABLE_ENABLED.key(),
ERROR_TABLE_ENABLED.defaultValue());
+ if (isErrorTableEnabledInTransformer) {
+ dataset = dataset.withColumn(ERROR_TABLE_CURRUPT_RECORD_COL_NAME,
functions.when(new Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)
+ .isNull(),
+ functions.lit("true")
+ ).otherwise(new Column(ERROR_TABLE_CURRUPT_RECORD_COL_NAME)));
+ }
+ return dataset;
+ };
+ }
+
+ private Transformer getErrorRecordColumnDropTransformer() {
+ return (jsc, sparkSession, dataset, properties) -> dataset.select("foo");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ // empty identifier
+
":org.apache.hudi.utilities.transform.FlatteningTransformer,T2:org.apache.hudi.utilities.transform.FlatteningTransformer",
+ // same identifier
+
"T1:org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer",
+ // Two colons in transformer config
+ "T1::org.apache.hudi.utilities.transform.FlatteningTransformer",
+ // either all transformers have identifier or none have
+
"org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer"
+ })
+ public void testErrorTableAwareChainedTransformerValidationFails(String
transformerName) {
+ try {
+ ErrorTableAwareChainedTransformer transformer = new
ErrorTableAwareChainedTransformer(Arrays.asList(transformerName.split(",")));
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof IllegalArgumentException, e.getMessage());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+
"T1:org.apache.hudi.utilities.transform.FlatteningTransformer,T2:org.apache.hudi.utilities.transform.FlatteningTransformer",
+
"T2:org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer",
+
"abc:org.apache.hudi.utilities.transform.FlatteningTransformer,def:org.apache.hudi.utilities.transform.FlatteningTransformer",
+
"org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.FlatteningTransformer"
+ })
+ public void testErrorTableAwareChainedTransformerValidationPasses(String
transformerName) {
+ ErrorTableAwareChainedTransformer transformer = new
ErrorTableAwareChainedTransformer(Arrays.asList(transformerName.split(",")));
+ assertNotNull(transformer);
+ }
+}