nsivabalan commented on code in PR #8514:
URL: https://github.com/apache/hudi/pull/8514#discussion_r1183180023
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -276,7 +276,17 @@ public static class Config implements Serializable {
+ ". Allows transforming raw source Dataset to a target Dataset
(conforming to target schema) before "
+ "writing. Default : Not set. E:g -
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
+ "allows a SQL query templated to be passed as a transformation
function). "
- + "Pass a comma-separated list of subclass names to chain the
transformations.")
+ + "Pass a comma-separated list of subclass names to chain the
transformations. Transformer can also include "
+ + "an identifier. E:g -
tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer. Here the
identifier tr1 "
+ + "can be used along with property key like
`hoodie.deltastreamer.transformer.sql.tr1` to identify properties related "
+ + "to the transformer. So effective value for
`hoodie.deltastreamer.transformer.sql` is determined by key "
+ + "`hoodie.deltastreamer.transformer.sql.tr1` for this
transformer. This is useful when there are two or more "
+ + "transformers using the same config keys and expect different
values for those keys. If identifier is used, it should "
+ + "be specified for all the transformers. Further the order in
which transformer is applied is determined by the occurrence "
+ + "of transformer irrespective of the identifier used for the
transformer. For example: In the configured value below "
+ +
"tr2:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer,tr1:org.apache.hudi.utilities.transform.SqlQueryBasedTransformer
"
+ + ", tr2 is applied before tr1 based on order of occurrence."
+ )
Review Comment:
can we call out that this identifier format is not strictly required unless
users have a requirement to have multiple transformers of the same type.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -19,36 +19,137 @@
package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
* A {@link Transformer} to chain other {@link Transformer}s and apply
sequentially.
*/
public class ChainedTransformer implements Transformer {
- private List<Transformer> transformers;
+ // Delimiter used to separate class name and the property key suffix. The
suffix comes first.
+ private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":";
- public ChainedTransformer(List<Transformer> transformers) {
- this.transformers = transformers;
+ private final List<TransformerInfo> transformers;
+
+ public ChainedTransformer(List<Transformer> transformersList) {
+ this.transformers = new ArrayList<>(transformersList.size());
+ for (Transformer transformer : transformersList) {
+ this.transformers.add(new TransformerInfo(transformer));
+ }
+ }
+
+ /**
+ * Creates a chained transformer using the input transformer class names.
Refer {@link HoodieDeltaStreamer.Config#transformerClassNames}
+ * for more information on how the transformers can be configured.
+ *
+ * @param configuredTransformers List of configured transformer class names.
+ * @param ignore Added for avoiding two methods with same erasure. Ignored.
+ */
+ public ChainedTransformer(List<String> configuredTransformers, int...
ignore) {
+ this.transformers = new ArrayList<>(configuredTransformers.size());
+
+ Set<String> identifiers = new HashSet<>();
+ for (String configuredTransformer : configuredTransformers) {
+ if
(!configuredTransformer.contains(TRANSFORMER_CLASS_NAME_ID_DELIMITER)) {
+ transformers.add(new
TransformerInfo(ReflectionUtils.loadClass(configuredTransformer)));
+ } else {
+ String[] splits =
configuredTransformer.split(TRANSFORMER_CLASS_NAME_ID_DELIMITER);
+ if (splits.length > 2) {
+ throw new IllegalArgumentException("There should only be one colon
in a configured transformer");
+ }
+ String id = splits[0];
+ validateIdentifier(id, identifiers, configuredTransformer);
+ Transformer transformer = ReflectionUtils.loadClass(splits[1]);
+ transformers.add(new TransformerInfo(transformer, id));
+ }
+ }
+
+
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+ || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
+ "Either all transformers should have identifier or none should");
}
public List<String> getTransformersNames() {
- return transformers.stream().map(t ->
t.getClass().getName()).collect(Collectors.toList());
+ return transformers.stream().map(t ->
t.getTransformer().getClass().getName()).collect(Collectors.toList());
}
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
Dataset<Row> dataset = rowDataset;
- for (Transformer t : transformers) {
- dataset = t.apply(jsc, sparkSession, dataset, properties);
+ for (TransformerInfo transformerInfo : transformers) {
+ Transformer transformer = transformerInfo.getTransformer();
+ dataset = transformer.apply(jsc, sparkSession, dataset,
transformerInfo.getProperties(properties));
}
return dataset;
}
+
+ private void validateIdentifier(String id, Set<String> identifiers, String
configuredTransformer) {
+ ValidationUtils.checkArgument(StringUtils.nonEmpty(id),
String.format("Transformer identifier is empty for %s", configuredTransformer));
+ if (identifiers.contains(id)) {
+ throw new IllegalArgumentException(String.format("Duplicate identifier
%s found for transformer %s", id, configuredTransformer));
+ } else {
+ identifiers.add(id);
+ }
+ }
+
+ private static class TransformerInfo {
+ private final Transformer transformer;
+ private final Option<String> idOpt;
+
+ private TransformerInfo(Transformer transformer, String idOpt) {
+ this.transformer = transformer;
+ this.idOpt = Option.of(idOpt);
+ }
+
+ private TransformerInfo(Transformer transformer) {
+ this.transformer = transformer;
+ this.idOpt = Option.empty();
+ }
+
+ private Transformer getTransformer() {
+ return transformer;
+ }
+
+ private boolean hasIdentifier() {
+ return idOpt.isPresent();
+ }
+
+ private TypedProperties getProperties(TypedProperties properties) {
+ TypedProperties transformerProps = properties;
+ if (idOpt.isPresent()) {
+ // Transformer specific property keys end with the id associated with
the transformer.
+ // Ex. For id tr1, key `hoodie.deltastreamer.transformer.sql.tr1`
would be converted to
+ // `hoodie.deltastreamer.transformer.sql` and then passed to the
transformer.
+ String id = idOpt.get();
+ transformerProps = new TypedProperties(properties);
+ Map<String, Object> overrideKeysMap = new HashMap<>();
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ String key = (String) entry.getKey();
+ if (key.endsWith("." + id)) {
Review Comment:
don't we need to remove the corresponding props from transformerProps in
addition to adding the trimmed ones?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -19,36 +19,137 @@
package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
* A {@link Transformer} to chain other {@link Transformer}s and apply
sequentially.
*/
public class ChainedTransformer implements Transformer {
- private List<Transformer> transformers;
+ // Delimiter used to separate class name and the property key suffix. The
suffix comes first.
+ private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":";
- public ChainedTransformer(List<Transformer> transformers) {
- this.transformers = transformers;
+ private final List<TransformerInfo> transformers;
+
+ public ChainedTransformer(List<Transformer> transformersList) {
+ this.transformers = new ArrayList<>(transformersList.size());
+ for (Transformer transformer : transformersList) {
+ this.transformers.add(new TransformerInfo(transformer));
+ }
+ }
+
+ /**
+ * Creates a chained transformer using the input transformer class names.
Refer {@link HoodieDeltaStreamer.Config#transformerClassNames}
+ * for more information on how the transformers can be configured.
+ *
+ * @param configuredTransformers List of configured transformer class names.
+ * @param ignore Added for avoiding two methods with same erasure. Ignored.
+ */
+ public ChainedTransformer(List<String> configuredTransformers, int...
ignore) {
+ this.transformers = new ArrayList<>(configuredTransformers.size());
+
+ Set<String> identifiers = new HashSet<>();
+ for (String configuredTransformer : configuredTransformers) {
+ if
(!configuredTransformer.contains(TRANSFORMER_CLASS_NAME_ID_DELIMITER)) {
+ transformers.add(new
TransformerInfo(ReflectionUtils.loadClass(configuredTransformer)));
+ } else {
+ String[] splits =
configuredTransformer.split(TRANSFORMER_CLASS_NAME_ID_DELIMITER);
+ if (splits.length > 2) {
+ throw new IllegalArgumentException("There should only be one colon
in a configured transformer");
+ }
+ String id = splits[0];
+ validateIdentifier(id, identifiers, configuredTransformer);
+ Transformer transformer = ReflectionUtils.loadClass(splits[1]);
+ transformers.add(new TransformerInfo(transformer, id));
+ }
+ }
+
+
ValidationUtils.checkArgument(transformers.stream().allMatch(TransformerInfo::hasIdentifier)
+ || transformers.stream().noneMatch(TransformerInfo::hasIdentifier),
+ "Either all transformers should have identifier or none should");
}
public List<String> getTransformersNames() {
- return transformers.stream().map(t ->
t.getClass().getName()).collect(Collectors.toList());
+ return transformers.stream().map(t ->
t.getTransformer().getClass().getName()).collect(Collectors.toList());
}
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset, TypedProperties properties) {
Dataset<Row> dataset = rowDataset;
- for (Transformer t : transformers) {
- dataset = t.apply(jsc, sparkSession, dataset, properties);
+ for (TransformerInfo transformerInfo : transformers) {
+ Transformer transformer = transformerInfo.getTransformer();
+ dataset = transformer.apply(jsc, sparkSession, dataset,
transformerInfo.getProperties(properties));
}
return dataset;
}
+
+ private void validateIdentifier(String id, Set<String> identifiers, String
configuredTransformer) {
+ ValidationUtils.checkArgument(StringUtils.nonEmpty(id),
String.format("Transformer identifier is empty for %s", configuredTransformer));
+ if (identifiers.contains(id)) {
+ throw new IllegalArgumentException(String.format("Duplicate identifier
%s found for transformer %s", id, configuredTransformer));
+ } else {
+ identifiers.add(id);
+ }
+ }
+
+ private static class TransformerInfo {
+ private final Transformer transformer;
+ private final Option<String> idOpt;
+
+ private TransformerInfo(Transformer transformer, String idOpt) {
Review Comment:
String id
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java:
##########
@@ -191,15 +190,11 @@ public static SchemaPostProcessor
createSchemaPostProcessor(
}
- public static Option<Transformer> createTransformer(List<String> classNames)
throws IOException {
+ public static Option<Transformer> createTransformer(Option<List<String>>
classNamesOpt) throws IOException {
try {
- List<Transformer> transformers = new ArrayList<>();
- for (String className :
Option.ofNullable(classNames).orElse(Collections.emptyList())) {
- transformers.add(ReflectionUtils.loadClass(className));
- }
- return transformers.isEmpty() ? Option.empty() : Option.of(new
ChainedTransformer(transformers));
+ return classNamesOpt.map(classNames -> classNames.isEmpty() ? null : new
ChainedTransformer(classNames));
} catch (Throwable e) {
- throw new IOException("Could not load transformer class(es) " +
classNames, e);
+ throw new IOException("Could not load transformer class(es) " +
classNamesOpt, e);
Review Comment:
classNamesOpt.get()
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java:
##########
@@ -19,36 +19,137 @@
package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
* A {@link Transformer} to chain other {@link Transformer}s and apply
sequentially.
*/
public class ChainedTransformer implements Transformer {
- private List<Transformer> transformers;
+ // Delimiter used to separate class name and the property key suffix. The
suffix comes first.
+ private static final String TRANSFORMER_CLASS_NAME_ID_DELIMITER = ":";
Review Comment:
ID_TRANSFORMER_CLASS_NAME_DELIMITER
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]