lokeshj1703 commented on code in PR #8514:
URL: https://github.com/apache/hudi/pull/8514#discussion_r1183341328


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -19,36 +19,137 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":";
 
-  public ChainedTransformer(List<Transformer> transformers) {
-    this.transformers = transformers;
+  private final List<TransformerInfo> transformers;
+
+  public ChainedTransformer(List<Transformer> transformersList) {
+    this.transformers = new ArrayList<>(transformersList.size());
+    for (Transformer transformer : transformersList) {
+      this.transformers.add(new TransformerInfo(transformer));
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. 
Refer {@link HoodieDeltaStreamer.Config#transformerClassNames}
+   * for more information on how the transformers can be configured.
+   *
+   * @param configuredTransformers List of configured transformer class names.
+   * @param ignore Added for avoiding two methods with same erasure. Ignored.
+   */
+  public ChainedTransformer(List<String> configuredTransformers, int... 
ignore) {
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+
+    Set<String> identifiers = new HashSet<>();
+    for (String configuredTransformer : configuredTransformers) {
+      if 
(!configuredTransformer.contains(TRANSFORMER_CLASS_NAME_ID_DELIMITER)) {
+        transformers.add(new 
TransformerInfo(ReflectionUtils.loadClass(configuredTransformer)));
+      } else {
+        String[] splits = 
configuredTransformer.split(TRANSFORMER_CLASS_NAME_ID_DELIMITER);
+        if (splits.length > 2) {
+          throw new IllegalArgumentException("There should only be one colon 
in a configured transformer");
+        }
+        String id = splits[0];
+        validateIdentifier(id, identifiers, configuredTransformer);
+        Transformer transformer = ReflectionUtils.loadClass(splits[1]);
+        transformers.add(new TransformerInfo(transformer, id));
+      }
+    }
+
+    
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+            || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
+        "Either all transformers should have identifier or none should");
   }
 
   public List<String> getTransformersNames() {
-    return transformers.stream().map(t -> 
t.getClass().getName()).collect(Collectors.toList());
+    return transformers.stream().map(t -> 
t.getTransformer().getClass().getName()).collect(Collectors.toList());
   }
 
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
-    for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+    for (TransformerInfo transformerInfo : transformers) {
+      Transformer transformer = transformerInfo.getTransformer();
+      dataset = transformer.apply(jsc, sparkSession, dataset, 
transformerInfo.getProperties(properties));
     }
     return dataset;
   }
+
+  private void validateIdentifier(String id, Set<String> identifiers, String 
configuredTransformer) {
+    ValidationUtils.checkArgument(StringUtils.nonEmpty(id), 
String.format("Transformer identifier is empty for %s", configuredTransformer));
+    if (identifiers.contains(id)) {
+      throw new IllegalArgumentException(String.format("Duplicate identifier 
%s found for transformer %s", id, configuredTransformer));
+    } else {
+      identifiers.add(id);
+    }
+  }
+
+  private static class TransformerInfo {
+    private final Transformer transformer;
+    private final Option<String> idOpt;
+
+    private TransformerInfo(Transformer transformer, String idOpt) {
+      this.transformer = transformer;
+      this.idOpt = Option.of(idOpt);
+    }
+
+    private TransformerInfo(Transformer transformer) {
+      this.transformer = transformer;
+      this.idOpt = Option.empty();
+    }
+
+    private Transformer getTransformer() {
+      return transformer;
+    }
+
+    private boolean hasIdentifier() {
+      return idOpt.isPresent();
+    }
+
+    private TypedProperties getProperties(TypedProperties properties) {
+      TypedProperties transformerProps = properties;
+      if (idOpt.isPresent()) {
+        // Transformer specific property keys end with the id associated with 
the transformer.
+        // Ex. For id tr1, key `hoodie.deltastreamer.transformer.sql.tr1` 
would be converted to
+        // `hoodie.deltastreamer.transformer.sql` and then passed to the 
transformer.
+        String id = idOpt.get();
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + id)) {

Review Comment:
   No, we wont need that because the other untrimmed props would not be queried 
by the transformer.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -19,36 +19,137 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * A {@link Transformer} to chain other {@link Transformer}s and apply 
sequentially.
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
+  private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":";
 
-  public ChainedTransformer(List<Transformer> transformers) {
-    this.transformers = transformers;
+  private final List<TransformerInfo> transformers;
+
+  public ChainedTransformer(List<Transformer> transformersList) {
+    this.transformers = new ArrayList<>(transformersList.size());
+    for (Transformer transformer : transformersList) {
+      this.transformers.add(new TransformerInfo(transformer));
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. 
Refer {@link HoodieDeltaStreamer.Config#transformerClassNames}
+   * for more information on how the transformers can be configured.
+   *
+   * @param configuredTransformers List of configured transformer class names.
+   * @param ignore Added for avoiding two methods with same erasure. Ignored.
+   */
+  public ChainedTransformer(List<String> configuredTransformers, int... 
ignore) {
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+
+    Set<String> identifiers = new HashSet<>();
+    for (String configuredTransformer : configuredTransformers) {
+      if 
(!configuredTransformer.contains(TRANSFORMER_CLASS_NAME_ID_DELIMITER)) {
+        transformers.add(new 
TransformerInfo(ReflectionUtils.loadClass(configuredTransformer)));
+      } else {
+        String[] splits = 
configuredTransformer.split(TRANSFORMER_CLASS_NAME_ID_DELIMITER);
+        if (splits.length > 2) {
+          throw new IllegalArgumentException("There should only be one colon 
in a configured transformer");
+        }
+        String id = splits[0];
+        validateIdentifier(id, identifiers, configuredTransformer);
+        Transformer transformer = ReflectionUtils.loadClass(splits[1]);
+        transformers.add(new TransformerInfo(transformer, id));
+      }
+    }
+
+    
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+            || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
+        "Either all transformers should have identifier or none should");
   }
 
   public List<String> getTransformersNames() {
-    return transformers.stream().map(t -> 
t.getClass().getName()).collect(Collectors.toList());
+    return transformers.stream().map(t -> 
t.getTransformer().getClass().getName()).collect(Collectors.toList());
   }
 
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
-    for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+    for (TransformerInfo transformerInfo : transformers) {
+      Transformer transformer = transformerInfo.getTransformer();
+      dataset = transformer.apply(jsc, sparkSession, dataset, 
transformerInfo.getProperties(properties));
     }
     return dataset;
   }
+
+  private void validateIdentifier(String id, Set<String> identifiers, String 
configuredTransformer) {
+    ValidationUtils.checkArgument(StringUtils.nonEmpty(id), 
String.format("Transformer identifier is empty for %s", configuredTransformer));
+    if (identifiers.contains(id)) {
+      throw new IllegalArgumentException(String.format("Duplicate identifier 
%s found for transformer %s", id, configuredTransformer));
+    } else {
+      identifiers.add(id);
+    }
+  }
+
+  private static class TransformerInfo {
+    private final Transformer transformer;
+    private final Option<String> idOpt;
+
+    private TransformerInfo(Transformer transformer, String idOpt) {

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to