lokeshj1703 commented on code in PR #8574:
URL: https://github.com/apache/hudi/pull/8574#discussion_r1177791890
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -18,25 +18,89 @@
package org.apache.hudi.utilities.transform;
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
* A {@link Transformer} to chain other {@link Transformer}s and apply
sequentially.
*/
public class ChainedTransformer implements Transformer {
- private List<Transformer> transformers;
+ // Delimiter used to separate class name and the property key suffix. The
suffix comes first.
+ private static final String TRANSFORMER_CLASS_NAME_KEY_SUFFIX_DELIMITER =
":";
+
+ private final List<Transformer> transformers;
+ private final Map<Transformer, String> transformerToPropKeySuffix;
+ private Option<Schema> sourceSchemaOpt = Option.empty();
+ private boolean enableSchemaValidation = false;
public ChainedTransformer(List<Transformer> transformers) {
this.transformers = transformers;
+ this.transformerToPropKeySuffix = new HashMap<>(transformers.size());
+ for (Transformer transformer : this.transformers) {
+ transformerToPropKeySuffix.put(transformer, "");
+ }
+ }
+
+ /**
+ * Creates a chained transformer using the input transformer class names.
The name can also include
+ * a suffix. This suffix can be appended with the property keys to identify
properties related to the transformer.
+ * E:g - tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer
can be used along with property key
+ * hoodie.deltastreamer.transformer.sql.tr1. Here tr1 is a suffix used to
identify the keys specific to this transformer.
+ * This suffix is removed from the configuration keys when the transformer
is used. This is useful when there are two or more
+ * transformers using the same config keys and expect different values for
those keys.
+ *
+ * @param sourceSchemaOpt Source Schema
+ * @param configuredTransformers List of configured transformer
class names.
+ * @param enableSchemaValidation if true, schema is validated for the
transformed data against expected schema.
+ * Expected schema is provided by
{@link Transformer#schemaTransform}
+ */
+ public ChainedTransformer(List<String> configuredTransformers,
Option<Schema> sourceSchemaOpt, boolean enableSchemaValidation) {
+ this.transformerToPropKeySuffix = new
HashMap<>(configuredTransformers.size());
+ this.transformers = new ArrayList<>(configuredTransformers.size());
+ this.enableSchemaValidation = enableSchemaValidation;
+ this.sourceSchemaOpt = sourceSchemaOpt;
+ if (enableSchemaValidation) {
+ ValidationUtils.checkArgument(sourceSchemaOpt.isPresent(), "Source
schema should not be null");
+ }
+
+ List<Pair<String, String>> transformerClassNamesToSuffixList = new
ArrayList<>(configuredTransformers.size());
+ for (String configuredTransformer : configuredTransformers) {
+ if (!configuredTransformer.contains(":")) {
+ transformerClassNamesToSuffixList.add(Pair.of(configuredTransformer,
""));
+ } else {
+ String[] splits = configuredTransformer.split(":");
+ if (splits.length > 2) {
+ throw new IllegalArgumentException("There should only be one colon
in a configured transformer");
+ }
+ transformerClassNamesToSuffixList.add(Pair.of(splits[1], splits[0]));
+ }
+ }
+
+ for (Pair<String, String> pair : transformerClassNamesToSuffixList) {
Review Comment:
Addressed in https://github.com/apache/hudi/pull/8514
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]