This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f41e22eb3b [HUDI-6113] Support multiple transformers using the same 
config keys in DeltaStreamer (#8514)
7f41e22eb3b is described below

commit 7f41e22eb3bfb38f9619a251930f771d537b3bb7
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed May 3 20:55:59 2023 +0530

    [HUDI-6113] Support multiple transformers using the same config keys in 
DeltaStreamer (#8514)
    
    - Currently DeltaStreamers supports two or more transformers of the same 
type (using the same configs). But these transformers are using the same config 
keys and could require these keys to be configured with different values.
---
 .../org/apache/hudi/utilities/UtilHelpers.java     |  11 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   2 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |  12 ++-
 .../utilities/transform/ChainedTransformer.java    | 113 +++++++++++++++++++--
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |  46 +++++++++
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  44 --------
 .../utilities/deltastreamer/TestTransformer.java   |  95 +++++++++++++++++
 .../functional/TestChainedTransformer.java         |  37 +++++++
 8 files changed, 300 insertions(+), 60 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 450df8aed25..721ba2eb9f4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -97,7 +97,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -191,15 +190,11 @@ public class UtilHelpers {
 
   }
 
-  public static Option<Transformer> createTransformer(List<String> classNames) 
throws IOException {
+  public static Option<Transformer> createTransformer(Option<List<String>> 
classNamesOpt) throws IOException {
     try {
-      List<Transformer> transformers = new ArrayList<>();
-      for (String className : 
Option.ofNullable(classNames).orElse(Collections.emptyList())) {
-        transformers.add(ReflectionUtils.loadClass(className));
-      }
-      return transformers.isEmpty() ? Option.empty() : Option.of(new 
ChainedTransformer(transformers));
+      return classNamesOpt.map(classNames -> classNames.isEmpty() ? null : new 
ChainedTransformer(classNames));
     } catch (Throwable e) {
-      throw new IOException("Could not load transformer class(es) " + 
classNames, e);
+      throw new IOException("Could not load transformer class(es) " + 
classNamesOpt.get(), e);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index c59510f3676..8e2b03c7849 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -285,7 +285,7 @@ public class DeltaSync implements Serializable, Closeable {
     // Register User Provided schema first
     registerAvroSchemas(schemaProvider);
 
-    this.transformer = 
UtilHelpers.createTransformer(cfg.transformerClassNames);
+    this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames));
 
     this.metrics = (HoodieIngestionMetrics) 
ReflectionUtils.loadClass(cfg.ingestionMetricsClass, 
getHoodieClientConfig(this.schemaProvider));
     this.hoodieMetrics = new 
HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 6dcb9463cb0..d99298b92a6 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -276,7 +276,17 @@ public class HoodieDeltaStreamer implements Serializable {
             + ". Allows transforming raw source Dataset to a target Dataset 
(conforming to target schema) before "
             + "writing. Default : Not set. E:g - 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
             + "allows a SQL query templated to be passed as a transformation 
function). "
-            + "Pass a comma-separated list of subclass names to chain the 
transformations.")
+            + "Pass a comma-separated list of subclass names to chain the 
transformations. If there are two or more "
+            + "transformers using the same config keys and expect different 
values for those keys, then transformer can include "
+            + "an identifier. E:g - 
tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer. Here the 
identifier tr1 "
+            + "can be used along with property key like 
`hoodie.deltastreamer.transformer.sql.tr1` to identify properties related "
+            + "to the transformer. So effective value for 
`hoodie.deltastreamer.transformer.sql` is determined by key "
+            + "`hoodie.deltastreamer.transformer.sql.tr1` for this 
transformer. If identifier is used, it should "
+            + "be specified for all the transformers. Further the order in 
which transformer is applied is determined by the occurrence "
+            + "of transformer irrespective of the identifier used for the 
transformer. For example: In the configured value below "
+            + 
"tr2:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer,tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer
 "
+            + ", tr2 is applied before tr1 based on order of occurrence."
+    )
     public List<String> transformerClassNames = null;
 
     @Parameter(names = {"--source-limit"}, description = "Maximum amount of 
data to read from source. "
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index 1161a737e85..22100563204 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -19,13 +19,23 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -33,22 +43,113 @@ import java.util.stream.Collectors;
  */
 public class ChainedTransformer implements Transformer {
 
-  private List<Transformer> transformers;
+  // Delimiter used to separate class name and the property key suffix. The 
suffix comes first.
+  private static final String ID_TRANSFORMER_CLASS_NAME_DELIMITER = ":";
 
-  public ChainedTransformer(List<Transformer> transformers) {
-    this.transformers = transformers;
+  private final List<TransformerInfo> transformers;
+
+  public ChainedTransformer(List<Transformer> transformersList) {
+    this.transformers = new ArrayList<>(transformersList.size());
+    for (Transformer transformer : transformersList) {
+      this.transformers.add(new TransformerInfo(transformer));
+    }
+  }
+
+  /**
+   * Creates a chained transformer using the input transformer class names. 
Refer {@link HoodieDeltaStreamer.Config#transformerClassNames}
+   * for more information on how the transformers can be configured.
+   *
+   * @param configuredTransformers List of configured transformer class names.
+   * @param ignore Added for avoiding two methods with same erasure. Ignored.
+   */
+  public ChainedTransformer(List<String> configuredTransformers, int... 
ignore) {
+    this.transformers = new ArrayList<>(configuredTransformers.size());
+
+    Set<String> identifiers = new HashSet<>();
+    for (String configuredTransformer : configuredTransformers) {
+      if 
(!configuredTransformer.contains(ID_TRANSFORMER_CLASS_NAME_DELIMITER)) {
+        transformers.add(new 
TransformerInfo(ReflectionUtils.loadClass(configuredTransformer)));
+      } else {
+        String[] splits = 
configuredTransformer.split(ID_TRANSFORMER_CLASS_NAME_DELIMITER);
+        if (splits.length > 2) {
+          throw new IllegalArgumentException("There should only be one colon 
in a configured transformer");
+        }
+        String id = splits[0];
+        validateIdentifier(id, identifiers, configuredTransformer);
+        Transformer transformer = ReflectionUtils.loadClass(splits[1]);
+        transformers.add(new TransformerInfo(transformer, id));
+      }
+    }
+
+    
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+            || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
+        "Either all transformers should have identifier or none should");
   }
 
   public List<String> getTransformersNames() {
-    return transformers.stream().map(t -> 
t.getClass().getName()).collect(Collectors.toList());
+    return transformers.stream().map(t -> 
t.getTransformer().getClass().getName()).collect(Collectors.toList());
   }
 
   @Override
   public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset, TypedProperties properties) {
     Dataset<Row> dataset = rowDataset;
-    for (Transformer t : transformers) {
-      dataset = t.apply(jsc, sparkSession, dataset, properties);
+    for (TransformerInfo transformerInfo : transformers) {
+      Transformer transformer = transformerInfo.getTransformer();
+      dataset = transformer.apply(jsc, sparkSession, dataset, 
transformerInfo.getProperties(properties));
     }
     return dataset;
   }
+
+  private void validateIdentifier(String id, Set<String> identifiers, String 
configuredTransformer) {
+    ValidationUtils.checkArgument(StringUtils.nonEmpty(id), 
String.format("Transformer identifier is empty for %s", configuredTransformer));
+    if (identifiers.contains(id)) {
+      throw new IllegalArgumentException(String.format("Duplicate identifier 
%s found for transformer %s", id, configuredTransformer));
+    } else {
+      identifiers.add(id);
+    }
+  }
+
+  private static class TransformerInfo {
+    private final Transformer transformer;
+    private final Option<String> idOpt;
+
+    private TransformerInfo(Transformer transformer, String id) {
+      this.transformer = transformer;
+      this.idOpt = Option.of(id);
+    }
+
+    private TransformerInfo(Transformer transformer) {
+      this.transformer = transformer;
+      this.idOpt = Option.empty();
+    }
+
+    private Transformer getTransformer() {
+      return transformer;
+    }
+
+    private boolean hasIdentifier() {
+      return idOpt.isPresent();
+    }
+
+    private TypedProperties getProperties(TypedProperties properties) {
+      TypedProperties transformerProps = properties;
+      if (idOpt.isPresent()) {
+        // Transformer specific property keys end with the id associated with 
the transformer.
+        // Ex. For id tr1, key `hoodie.deltastreamer.transformer.sql.tr1` 
would be converted to
+        // `hoodie.deltastreamer.transformer.sql` and then passed to the 
transformer.
+        String id = idOpt.get();
+        transformerProps = new TypedProperties(properties);
+        Map<String, Object> overrideKeysMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+          String key = (String) entry.getKey();
+          if (key.endsWith("." + id)) {
+            overrideKeysMap.put(key.substring(0, key.length() - (id.length() + 
1)), entry.getValue());
+          }
+        }
+        transformerProps.putAll(overrideKeysMap);
+      }
+
+      return transformerProps;
+    }
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 7e0c73f94aa..81d015f72f3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -28,9 +28,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.TestDataSource;
+import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
 import org.apache.avro.Schema;
@@ -302,6 +304,50 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
     }
   }
 
+  protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String emptyBatchParam) throws IOException {
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc",
+        PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, 
"partition_path", emptyBatchParam);
+  }
+
+  protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "");
+  }
+
+  protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+                                       String propsFileName, String 
parquetSourceRoot, boolean addCommonProps, String partitionPath) throws 
IOException {
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, 
addCommonProps,
+        partitionPath, "");
+  }
+
+  protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+                                       String propsFileName, String 
parquetSourceRoot, boolean addCommonProps,
+                                       String partitionPath, String 
emptyBatchParam) throws IOException {
+    // Properties used for testing delta-streamer with Parquet source
+    TypedProperties parquetProps = new TypedProperties();
+
+    if (addCommonProps) {
+      populateCommonProps(parquetProps, basePath);
+    }
+
+    parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+
+    parquetProps.setProperty("include", "base.properties");
+    parquetProps.setProperty("hoodie.embed.timeline.server", "false");
+    parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
+    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", 
partitionPath);
+    if (useSchemaProvider) {
+      
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/" + sourceSchemaFile);
+      if (hasTransformer) {
+        
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/" + targetSchemaFile);
+      }
+    }
+    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
parquetSourceRoot);
+    if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
+      
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, 
emptyBatchParam);
+    }
+    UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, fs, basePath + "/" 
+ propsFileName);
+  }
+
   protected static void prepareORCDFSFiles(int numRecords) throws IOException {
     prepareORCDFSFiles(numRecords, ORC_SOURCE_ROOT);
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index a70bcaa68bb..739b75ae5f7 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1716,50 +1716,6 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     testUtils.sendMessages(topicName, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 
numRecords, HoodieTestDataGenerator.TRIP_SCHEMA)));
   }
 
-  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String emptyBatchParam) throws IOException {
-    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc",
-        PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, 
"partition_path", emptyBatchParam);
-  }
-
-  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
-    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "");
-  }
-
-  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
-                                       String propsFileName, String 
parquetSourceRoot, boolean addCommonProps, String partitionPath) throws 
IOException {
-    prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, 
addCommonProps,
-        partitionPath, "");
-  }
-
-  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
-                                       String propsFileName, String 
parquetSourceRoot, boolean addCommonProps,
-                                       String partitionPath, String 
emptyBatchParam) throws IOException {
-    // Properties used for testing delta-streamer with Parquet source
-    TypedProperties parquetProps = new TypedProperties();
-
-    if (addCommonProps) {
-      populateCommonProps(parquetProps, basePath);
-    }
-
-    parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-
-    parquetProps.setProperty("include", "base.properties");
-    parquetProps.setProperty("hoodie.embed.timeline.server", "false");
-    parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
-    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", 
partitionPath);
-    if (useSchemaProvider) {
-      
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/" + sourceSchemaFile);
-      if (hasTransformer) {
-        
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/" + targetSchemaFile);
-      }
-    }
-    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
parquetSourceRoot);
-    if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
-      
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, 
emptyBatchParam);
-    }
-    UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, fs, basePath + "/" 
+ propsFileName);
-  }
-
   private void testParquetDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
     testParquetDFSSource(useSchemaProvider, transformerClassNames, false);
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
new file mode 100644
index 00000000000..f092541ca48
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.transform.Transformer;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestTransformer extends HoodieDeltaStreamerTestBase {
+
+  @Test
+  public void testMultipleTransformersWithIdentifiers() throws Exception {
+    // Configure 3 transformers of same type. 2nd transformer has no suffix
+    String[] arr = new String [] {
+        "1:" + TimestampTransformer.class.getName(),
+        "2:" + TimestampTransformer.class.getName(),
+        "3:" + TimestampTransformer.class.getName()};
+    List<String> transformerClassNames = Arrays.asList(arr);
+
+    // Create source using TRIP_SCHEMA
+    boolean useSchemaProvider = true;
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 10;
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+    prepareParquetDFSSource(useSchemaProvider, true, "source.avsc", 
"source.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", "");
+    String tableBasePath = basePath + 
"/testMultipleTransformersWithIdentifiers" + testNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+            transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+            useSchemaProvider, 100000, false, null, null, "timestamp", null), 
jsc);
+
+    // Set properties for multi transformer
+    // timestamp.transformer.increment is a common config and varies between 
the transformers
+    // timestamp.transformer.multiplier is also a common config but doesn't 
change between transformers
+    Properties properties = ((HoodieDeltaStreamer.DeltaSyncService) 
deltaStreamer.getIngestionService()).getProps();
+    // timestamp value initially is set to 0
+    // timestamp = 0 * 2 + 10; (transformation 1)
+    // timestamp = 10 * 2 + 20 = 40 (transformation 2)
+    // timestamp = 40 * 2 + 30 = 110 (transformation 3)
+    properties.setProperty("timestamp.transformer.increment.1", "10");
+    properties.setProperty("timestamp.transformer.increment.3", "30");
+    properties.setProperty("timestamp.transformer.increment", "20");
+    properties.setProperty("timestamp.transformer.multiplier", "2");
+    deltaStreamer.sync();
+
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(parquetRecordsCount, 
tableBasePath, sqlContext);
+    assertEquals(0, 
sqlContext.read().format("org.apache.hudi").load(tableBasePath).where("timestamp
 != 110").count());
+  }
+
+  /**
+   * Performs transformation on `timestamp` field.
+   */
+  public static class TimestampTransformer implements Transformer {
+
+    @Override
+    public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, 
Dataset<Row> rowDataset,
+                              TypedProperties properties) {
+      int multiplier = Integer.parseInt((String) 
properties.get("timestamp.transformer.multiplier"));
+      int increment = Integer.parseInt((String) 
properties.get("timestamp.transformer.increment"));
+      return rowDataset.withColumn("timestamp", 
functions.col("timestamp").multiply(multiplier).plus(increment));
+    }
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
index b493d436e2b..26768324946 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
@@ -31,6 +31,8 @@ import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.List;
@@ -40,6 +42,9 @@ import static org.apache.spark.sql.types.DataTypes.StringType;
 import static org.apache.spark.sql.types.DataTypes.createStructField;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @Tag("functional")
 public class TestChainedTransformer extends SparkClientFunctionalTestHarness {
@@ -66,4 +71,36 @@ public class TestChainedTransformer extends 
SparkClientFunctionalTestHarness {
     assertEquals(200, rows.get(1).getInt(0));
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {
+      // empty identifier
+      
":org.apache.hudi.utilities.transform.FlatteningTransformer,T2:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      // same identifier
+      
"T1:org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      // Two colons in transformer config
+      "T1::org.apache.hudi.utilities.transform.FlatteningTransformer",
+      // either all transformers have identifier or none have
+      
"org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer"
+  })
+  public void testChainedTransformerValidationFails(String transformerName) {
+    try {
+      ChainedTransformer transformer = new 
ChainedTransformer(Arrays.asList(transformerName.split(",")));
+      fail();
+    } catch (Exception e) {
+      assertTrue(e instanceof IllegalArgumentException, e.getMessage());
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {
+      
"T1:org.apache.hudi.utilities.transform.FlatteningTransformer,T2:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      
"T2:org.apache.hudi.utilities.transform.FlatteningTransformer,T1:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      
"abc:org.apache.hudi.utilities.transform.FlatteningTransformer,def:org.apache.hudi.utilities.transform.FlatteningTransformer",
+      
"org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.FlatteningTransformer"
+  })
+  public void testChainedTransformerValidationPasses(String transformerName) {
+    ChainedTransformer transformer = new 
ChainedTransformer(Arrays.asList(transformerName.split(",")));
+    assertNotNull(transformer);
+  }
+
 }

Reply via email to