vinothchandar commented on code in PR #8574:
URL: https://github.com/apache/hudi/pull/8574#discussion_r1176506429


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java:
##########
@@ -45,4 +46,8 @@ public interface Transformer {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties);
+
+  default Schema schemaTransform(JavaSparkContext jsc, SparkSession 
sparkSession, Schema incomingSchema, TypedProperties properties) {
+    throw new UnsupportedOperationException("Not implemented");

Review Comment:
   lets throw a specific Hudi exception here?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java:
##########
@@ -45,4 +46,8 @@ public interface Transformer {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties);
+
+  default Schema schemaTransform(JavaSparkContext jsc, SparkSession 
sparkSession, Schema incomingSchema, TypedProperties properties) {

Review Comment:
   rename: trasnformedSchema() to indicate its returning the transformed 
schema. 



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1802,6 +1802,44 @@ private void testParquetDFSSource(boolean 
useSchemaProvider, List<String> transf
     testNum++;
   }
 
+  @Test
+  public void testMultipleTransformers() throws Exception {

Review Comment:
   lets use this opp to pull a new Transformer test?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : 
null;
     for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+      String suffix = transformerToPropKeySuffix.get(t);
+      TypedProperties transformerProps = properties;
+      if (StringUtils.nonEmpty(suffix)) {
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + suffix)) {
+            overrideKeysMap.put(key.substring(0, key.length() - 
(suffix.length() + 1)), entry.getValue());
+          }
+        }
+        transformerProps.putAll(overrideKeysMap);
+      }
+      dataset = t.apply(jsc, sparkSession, dataset, transformerProps);
+      if (enableSchemaValidation) {
+        incomingSchema = validateAndGetTransformedSchema(t, dataset, 
incomingSchema, jsc, sparkSession, properties);
+      }
     }
     return dataset;
   }
+
+  private Schema validateAndGetTransformedSchema(Transformer transformer, 
Dataset<Row> dataset, Schema incomingSchema,

Review Comment:
   lets UT all these methods.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : 
null;
     for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+      String suffix = transformerToPropKeySuffix.get(t);

Review Comment:
   pull into another method to keep this block easier to read?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : 
null;

Review Comment:
   lets use Option instead of `null` as sentinels.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : 
null;
     for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+      String suffix = transformerToPropKeySuffix.get(t);
+      TypedProperties transformerProps = properties;
+      if (StringUtils.nonEmpty(suffix)) {
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + suffix)) {
+            overrideKeysMap.put(key.substring(0, key.length() - 
(suffix.length() + 1)), entry.getValue());
+          }
+        }
+        transformerProps.putAll(overrideKeysMap);
+      }
+      dataset = t.apply(jsc, sparkSession, dataset, transformerProps);
+      if (enableSchemaValidation) {
+        incomingSchema = validateAndGetTransformedSchema(t, dataset, 
incomingSchema, jsc, sparkSession, properties);

Review Comment:
   per current impl, we will error out here, even if the `schemaTransform()` 
method is not implemented?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -46,9 +110,40 @@ public List<String> getTransformersNames() {
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
+    Schema incomingSchema = enableSchemaValidation ? sourceSchemaOpt.get() : 
null;
     for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+      String suffix = transformerToPropKeySuffix.get(t);
+      TypedProperties transformerProps = properties;
+      if (StringUtils.nonEmpty(suffix)) {
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + suffix)) {
+            overrideKeysMap.put(key.substring(0, key.length() - 
(suffix.length() + 1)), entry.getValue());
+          }
+        }
+        transformerProps.putAll(overrideKeysMap);
+      }
+      dataset = t.apply(jsc, sparkSession, dataset, transformerProps);
+      if (enableSchemaValidation) {
+        incomingSchema = validateAndGetTransformedSchema(t, dataset, 
incomingSchema, jsc, sparkSession, properties);
+      }
     }
     return dataset;
   }
+
+  private Schema validateAndGetTransformedSchema(Transformer transformer, 
Dataset<Row> dataset, Schema incomingSchema,
+                                               JavaSparkContext jsc, 
SparkSession sparkSession, TypedProperties properties) {
+    Schema targetSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(dataset.schema(), 
incomingSchema.getName(),
+        incomingSchema.getNamespace());
+    Schema expectedTargetSchema = transformer.schemaTransform(jsc, 
sparkSession, incomingSchema, properties);
+    // TODO: Check the API arguments below

Review Comment:
   please resolve TODOs before we can land.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -18,25 +18,89 @@
 
 package org.apache.hudi.utilities.transform;
 
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_KEY_SUFFIX_DELIMITER = 
":";
+
+  private final List<Transformer> transformers;
+  private final Map<Transformer, String> transformerToPropKeySuffix;
+  private Option<Schema> sourceSchemaOpt = Option.empty();
+  private boolean enableSchemaValidation = false;
 
   public ChainedTransformer(List<Transformer> transformers) {
     this.transformers = transformers;
+    this.transformerToPropKeySuffix = new HashMap<>(transformers.size());
+    for (Transformer transformer : this.transformers) {
+      transformerToPropKeySuffix.put(transformer, "");
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. 
The name can also include
+   * a suffix. This suffix can be appended with the property keys to identify 
properties related to the transformer.
+   * E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer 
can be used along with property key
+   * hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix used to 
identify the keys specific to this transformer.
+   * This suffix is removed from the configuration keys when the transformer 
is used. This is useful when there are two or more
+   * transformers using the same config keys and expect different values for 
those keys.
+   *
+   * @param sourceSchemaOpt                   Source Schema
+   * @param configuredTransformers            List of configured transformer 
class names.
+   * @param enableSchemaValidation if true, schema is validated for the 
transformed data against expected schema.
+   *                                          Expected schema is provided by 
{@link Transformer#schemaTransform}
+   */
+  public ChainedTransformer(List<String> configuredTransformers, 
Option<Schema> sourceSchemaOpt, boolean enableSchemaValidation) {
+    this.transformerToPropKeySuffix = new 
HashMap<>(configuredTransformers.size());
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+    this.enableSchemaValidation = enableSchemaValidation;
+    this.sourceSchemaOpt = sourceSchemaOpt;
+    if (enableSchemaValidation) {
+      ValidationUtils.checkArgument(sourceSchemaOpt.isPresent(), "Source 
schema should not be null");
+    }
+
+    List<Pair<String, String>> transformerClassNamesToSuffixList = new 
ArrayList<>(configuredTransformers.size());

Review Comment:
   new method + UT?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/Transformer.java:
##########
@@ -45,4 +46,8 @@ public interface Transformer {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties);
+
+  default Schema schemaTransform(JavaSparkContext jsc, SparkSession 
sparkSession, Schema incomingSchema, TypedProperties properties) {
+    throw new UnsupportedOperationException("Not implemented");

Review Comment:
   Should it return an `Option` instead? The handling of code in an upper layer 
need not be written in terms of exception handling then?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -276,9 +276,19 @@ public static class Config implements Serializable {
             + ". Allows transforming raw source Dataset to a target Dataset 
(conforming to target schema) before "
             + "writing. Default : Not set. E:g - 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
             + "allows a SQL query templated to be passed as a transformation 
function). "
-            + "Pass a comma-separated list of subclass names to chain the 
transformations.")
+            + "Pass a comma-separated list of subclass names to chain the 
transformations. Transformer can also include "
+            + "a suffix. This suffix can be appended with the property keys to 
identify properties related to the transformer. "
+            + "E:g - 
tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer can be used 
along with property key "
+            + "hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix 
used to identify the keys specific to this transformer. "

Review Comment:
   It may help to allow such a scenario though. consider, the following. 
   
   ```
   tr1:io.bytearray.TransformerA
   tr2:io.bytearray.TransformerB
   tr3:io.bytearray.TransformerA
   ```
   
   with configs 
   
   ```
   io.bytearray.transformera.x=1
   io.bytearray.transformera.tr1.y=2
   io.bytearray.transformera.tr3.y=3
   ```
   here, we pass in same value for `x` into tr1/tr3. but different value for y? 
Can we think through this. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -276,9 +276,19 @@ public static class Config implements Serializable {
             + ". Allows transforming raw source Dataset to a target Dataset 
(conforming to target schema) before "
             + "writing. Default : Not set. E:g - 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
             + "allows a SQL query templated to be passed as a transformation 
function). "
-            + "Pass a comma-separated list of subclass names to chain the 
transformations.")
+            + "Pass a comma-separated list of subclass names to chain the 
transformations. Transformer can also include "

Review Comment:
    it's added to the start of the transformer (prefix) and end of property 
(suffix)?  lets call this an `id` instead of `suffix`? its easier to understand.
   
   and if I understand correctly, there ordering of transformers should not 
matter. rename everywhere uniformly? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -18,25 +18,89 @@
 
 package org.apache.hudi.utilities.transform;
 
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 
+import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_KEY_SUFFIX_DELIMITER = 
":";
+
+  private final List<Transformer> transformers;
+  private final Map<Transformer, String> transformerToPropKeySuffix;
+  private Option<Schema> sourceSchemaOpt = Option.empty();
+  private boolean enableSchemaValidation = false;
 
   public ChainedTransformer(List<Transformer> transformers) {
     this.transformers = transformers;
+    this.transformerToPropKeySuffix = new HashMap<>(transformers.size());
+    for (Transformer transformer : this.transformers) {
+      transformerToPropKeySuffix.put(transformer, "");
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. 
The name can also include
+   * a suffix. This suffix can be appended with the property keys to identify 
properties related to the transformer.
+   * E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer 
can be used along with property key
+   * hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix used to 
identify the keys specific to this transformer.
+   * This suffix is removed from the configuration keys when the transformer 
is used. This is useful when there are two or more
+   * transformers using the same config keys and expect different values for 
those keys.
+   *
+   * @param sourceSchemaOpt                   Source Schema
+   * @param configuredTransformers            List of configured transformer 
class names.
+   * @param enableSchemaValidation if true, schema is validated for the 
transformed data against expected schema.
+   *                                          Expected schema is provided by 
{@link Transformer#schemaTransform}
+   */
+  public ChainedTransformer(List<String> configuredTransformers, 
Option<Schema> sourceSchemaOpt, boolean enableSchemaValidation) {
+    this.transformerToPropKeySuffix = new 
HashMap<>(configuredTransformers.size());
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+    this.enableSchemaValidation = enableSchemaValidation;
+    this.sourceSchemaOpt = sourceSchemaOpt;
+    if (enableSchemaValidation) {
+      ValidationUtils.checkArgument(sourceSchemaOpt.isPresent(), "Source 
schema should not be null");
+    }
+
+    List<Pair<String, String>> transformerClassNamesToSuffixList = new 
ArrayList<>(configuredTransformers.size());
+    for (String configuredTransformer : configuredTransformers) {
+      if (!configuredTransformer.contains(":")) {
+        transformerClassNamesToSuffixList.add(Pair.of(configuredTransformer, 
""));
+      } else {
+        String[] splits = configuredTransformer.split(":");
+        if (splits.length > 2) {
+          throw new IllegalArgumentException("There should only be one colon 
in a configured transformer");
+        }
+        transformerClassNamesToSuffixList.add(Pair.of(splits[1], splits[0]));
+      }
+    }
+
+    for (Pair<String, String> pair : transformerClassNamesToSuffixList) {

Review Comment:
   I think we are missing a check for id/suffix uniqueness. we should error out 
if the same suffix is reused. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -276,9 +276,19 @@ public static class Config implements Serializable {
             + ". Allows transforming raw source Dataset to a target Dataset 
(conforming to target schema) before "
             + "writing. Default : Not set. E:g - 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
             + "allows a SQL query templated to be passed as a transformation 
function). "
-            + "Pass a comma-separated list of subclass names to chain the 
transformations.")
+            + "Pass a comma-separated list of subclass names to chain the 
transformations. Transformer can also include "
+            + "a suffix. This suffix can be appended with the property keys to 
identify properties related to the transformer. "
+            + "E:g - 
tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer can be used 
along with property key "
+            + "hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix 
used to identify the keys specific to this transformer. "

Review Comment:
   can a user have suffix/id in transformer class name and not in the property? 
lets clarify such behavior in docs in either case and also have checks to guard 
against misconfiguration. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to