the-other-tim-brown commented on code in PR #8574:
URL: https://github.com/apache/hudi/pull/8574#discussion_r1191166480
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java:
##########
@@ -79,6 +96,71 @@ public void testMultipleTransformersWithIdentifiers() throws
Exception {
assertEquals(0,
sqlContext.read().format("org.apache.hudi").load(tableBasePath).where("timestamp
!= 110").count());
}
+ @Test
+ public void testTransformerSchemaValidation() throws Exception {
+ List<String> transformerClassNames = Arrays.asList(
+ FlatteningTransformerWithTransformedSchema.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformerWithTransformedSchema.class.getName());
+ runDeltaStreamerWithTransformers(transformerClassNames);
+ }
+
+ @Test
+ public void testTransformerSchemaValidationFails() throws Exception {
+ String errorMsg = "Expected target schema not provided for transformer";
+ List<String> transformerClassNames = Arrays.asList(
+ FlatteningTransformer.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformerWithTransformedSchema.class.getName());
+ testTransformerSchemaValidationFails(transformerClassNames, errorMsg);
+
+ transformerClassNames = Arrays.asList(
+ FlatteningTransformerWithTransformedSchema.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformer.class.getName());
+ testTransformerSchemaValidationFails(transformerClassNames, errorMsg);
+
+ errorMsg = "Schema of transformed data does not match expected schema";
+ transformerClassNames = Arrays.asList(
+ FlatteningTransformerWithTransformedSchema.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformerWithWrongTransformedSchema.class.getName());
+ testTransformerSchemaValidationFails(transformerClassNames, errorMsg);
+ }
+
+ private void testTransformerSchemaValidationFails(List<String>
transformerClasses, String errorMsg) {
+ try {
+ runDeltaStreamerWithTransformers(transformerClasses);
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains(errorMsg), e.getMessage());
+ }
+ }
+
+ private void runDeltaStreamerWithTransformers(List<String>
transformerClassNames) throws Exception {
+ // Create source using TRIP_EXAMPLE_SCHEMA
+ boolean useSchemaProvider = true;
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+ int parquetRecordsCount = 10;
+ prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ prepareParquetDFSSource(useSchemaProvider, true, "source.avsc",
"target-flattened-addcolumn-transformer.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "");
+ String tableBasePath = basePath + "/testTransformerSchemaValidation" +
testNum;
+ HoodieDeltaStreamer.Config config =
TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT,
+ ParquetDFSSource.class.getName(), transformerClassNames,
PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider,
+ 100000, false, null, null, "timestamp", null);
+ config.enableTransformerSchemaValidation = true;
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
+ Properties properties = ((HoodieDeltaStreamer.DeltaSyncService)
deltaStreamer.getIngestionService()).getProps();
+ properties.setProperty("timestamp.transformer.increment", "20");
+ properties.setProperty("timestamp.transformer.multiplier", "2");
+
+ deltaStreamer.sync();
+ TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(parquetRecordsCount,
tableBasePath, sqlContext);
+ sqlContext.read().format("org.apache.hudi").load(tableBasePath).show();
Review Comment:
What value does `.show()` provide here?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java:
##########
@@ -79,6 +96,71 @@ public void testMultipleTransformersWithIdentifiers() throws
Exception {
assertEquals(0,
sqlContext.read().format("org.apache.hudi").load(tableBasePath).where("timestamp
!= 110").count());
}
+ @Test
+ public void testTransformerSchemaValidation() throws Exception {
+ List<String> transformerClassNames = Arrays.asList(
+ FlatteningTransformerWithTransformedSchema.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformerWithTransformedSchema.class.getName());
+ runDeltaStreamerWithTransformers(transformerClassNames);
+ }
+
+ @Test
+ public void testTransformerSchemaValidationFails() throws Exception {
+ String errorMsg = "Expected target schema not provided for transformer";
+ List<String> transformerClassNames = Arrays.asList(
+ FlatteningTransformer.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformerWithTransformedSchema.class.getName());
+ testTransformerSchemaValidationFails(transformerClassNames, errorMsg);
+
+ transformerClassNames = Arrays.asList(
+ FlatteningTransformerWithTransformedSchema.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformer.class.getName());
+ testTransformerSchemaValidationFails(transformerClassNames, errorMsg);
+
+ errorMsg = "Schema of transformed data does not match expected schema";
+ transformerClassNames = Arrays.asList(
+ FlatteningTransformerWithTransformedSchema.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformerWithWrongTransformedSchema.class.getName());
+ testTransformerSchemaValidationFails(transformerClassNames, errorMsg);
+ }
+
+ private void testTransformerSchemaValidationFails(List<String>
transformerClasses, String errorMsg) {
+ try {
Review Comment:
You can use assertThrows from junit for
thishttps://junit.org/junit5/docs/5.8.2/api/org.junit.jupiter.api/org/junit/jupiter/api/Assertions.html#assertThrows(java.lang.Class,org.junit.jupiter.api.function.Executable,java.lang.String)
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java:
##########
@@ -79,6 +96,71 @@ public void testMultipleTransformersWithIdentifiers() throws
Exception {
assertEquals(0,
sqlContext.read().format("org.apache.hudi").load(tableBasePath).where("timestamp
!= 110").count());
}
+ @Test
+ public void testTransformerSchemaValidation() throws Exception {
+ List<String> transformerClassNames = Arrays.asList(
+ FlatteningTransformerWithTransformedSchema.class.getName(),
+ TimestampTransformer.class.getName(),
+ AddColumnTransformerWithTransformedSchema.class.getName());
+ runDeltaStreamerWithTransformers(transformerClassNames);
+ }
+
+ @Test
+ public void testTransformerSchemaValidationFails() throws Exception {
Review Comment:
Should we break this into the different cases that are being tested so it is
clear what failure mode is being covered?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -59,11 +65,15 @@ public ChainedTransformer(List<Transformer>
transformersList) {
* Creates a chained transformer using the input transformer class names.
Refer {@link HoodieDeltaStreamer.Config#transformerClassNames}
* for more information on how the transformers can be configured.
*
- * @param configuredTransformers List of configured transformer class names.
- * @param ignore Added for avoiding two methods with same erasure. Ignored.
+ * @param sourceSchemaOpt Source Schema
Review Comment:
Can we add some context like "Schema from the dataset the transform is
applied to"?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -47,6 +51,8 @@ public class ChainedTransformer implements Transformer {
private static final String ID_TRANSFORMER_CLASS_NAME_DELIMITER = ":";
private final List<TransformerInfo> transformers;
+ private Option<Schema> sourceSchemaOpt = Option.empty();
Review Comment:
Make these final?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -47,6 +51,8 @@ public class ChainedTransformer implements Transformer {
private static final String ID_TRANSFORMER_CLASS_NAME_DELIMITER = ":";
private final List<TransformerInfo> transformers;
+ private Option<Schema> sourceSchemaOpt = Option.empty();
+ private boolean enableSchemaValidation = false;
public ChainedTransformer(List<Transformer> transformersList) {
Review Comment:
Should this entry point also support schema validation?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -285,7 +285,8 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg,
SparkSession sparkSession, Sche
// Register User Provided schema first
registerAvroSchemas(schemaProvider);
- this.transformer =
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames));
+ this.transformer =
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
Review Comment:
I want to call out by fetching the source schema here, we are doing a
one-time schema validation. I'm trying to think through the impact of this when
columns are added/dropped. This may be fine but wanted to make sure this was an
intentional decision.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -93,9 +103,13 @@ public List<String> getTransformersNames() {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
Dataset<Row> dataset = rowDataset;
+ Option<Schema> incomingSchemaOpt = sourceSchemaOpt;
for (TransformerInfo transformerInfo : transformers) {
Transformer transformer = transformerInfo.getTransformer();
dataset = transformer.apply(jsc, sparkSession, dataset,
transformerInfo.getProperties(properties));
+ if (enableSchemaValidation) {
+ incomingSchemaOpt = validateAndGetTransformedSchema(transformerInfo,
dataset, incomingSchemaOpt, jsc, sparkSession, properties);
Review Comment:
Why are we setting the incomingSchemaOpt to the targetSchemaOpt returned by
the method here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]