chamikaramj commented on a change in pull request #17101:
URL: https://github.com/apache/beam/pull/17101#discussion_r840997369



##########
File path: 
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java
##########
@@ -33,25 +43,199 @@
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Wrapper for invoking external Python transforms. */
 public class ExternalPythonTransform<InputT extends PInput, OutputT extends 
POutput>
     extends PTransform<InputT, OutputT> {
-  private final String fullyQualifiedName;
-  private final Row args;
-  private final Row kwargs;
 
-  public ExternalPythonTransform(String fullyQualifiedName, Row args, Row 
kwargs) {
+  private static final SchemaRegistry SCHEMA_REGISTRY = 
SchemaRegistry.createDefault();
+  private String fullyQualifiedName;
+
+  // We preseve the order here since Schema's care about order of fields but 
the order will not
+  // matter when applying kwargs at the Python side.
+  private SortedMap<String, Object> kwargsMap;
+
+  private Object[] argsArray;
+  private Row providedKwargsRow = null;
+
+  private ExternalPythonTransform(String fullyQualifiedName) {
     this.fullyQualifiedName = fullyQualifiedName;
-    this.args = args;
-    this.kwargs = kwargs;
+    this.kwargsMap = new TreeMap<>();
+    this.argsArray = new Object[] {};
+  }
+
+  /**
+   * Instantiates a cross-language wrapper for a Python transform with a given 
transform name.
+   *
+   * @param tranformName fully qualified transform name.
+   * @param <InputT> Input {@link PCollection} type
+   * @param <OutputT> Output {@link PCollection} type
+   * @return A {@link ExternalPythonTransform} for the given transform name.
+   */
+  public static <InputT extends PInput, OutputT extends POutput>
+      ExternalPythonTransform<InputT, OutputT> from(String tranformName) {
+    return new ExternalPythonTransform<InputT, OutputT>(tranformName);
+  }
+
+  /**
+   * Positional arguments for the Python cross-language transform. If invoked 
more than once, new
+   * arguments will be appended to the previously specified arguments.
+   *
+   * @param args list of arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withArgs(Object... args) {
+    Object[] result = Arrays.copyOf(this.argsArray, this.argsArray.length + 
args.length);
+    System.arraycopy(args, 0, result, this.argsArray.length, args.length);
+    this.argsArray = result;
+    return this;
+  }
+
+  /**
+   * Specifies a single keyword argument for the Python cross-language 
transform. This may be
+   * invoked multiple times to add more than one keyword argument.
+   *
+   * @param name argument name.
+   * @param value argument value
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwarg(String name, 
Object value) {
+    if (providedKwargsRow != null) {
+      throw new IllegalArgumentException("Kwargs were specified both directly 
and as a Row object");
+    }
+    kwargsMap.put(name, value);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments for the Python cross-language transform. If 
invoked more than once,
+   * new keyword arguments map will be added to the previously prided keyword 
arguments.
+   *
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Map<String, 
Object> kwargs) {
+    kwargsMap.putAll(kwargs);
+    return this;
+  }
+
+  /**
+   * Specifies keyword arguments as a Row objects.
+   *
+   * @param kwargs keyword arguments as a {@link Row} objects. An empty Row 
represents zero keyword
+   *     arguments.
+   * @return updated wrapper for the cross-language transform.
+   */
+  public ExternalPythonTransform<InputT, OutputT> withKwargs(Row kwargs) {
+    if (this.kwargsMap.size() > 0) {
+      throw new IllegalArgumentException("Kwargs were specified both directly 
and as a Row object");
+    }
+    this.providedKwargsRow = kwargs;
+    return this;
+  }
+
+  @VisibleForTesting
+  Row buildOrGetKvargsRow() {
+    if (providedKwargsRow != null) {
+      return providedKwargsRow;
+    } else if (kwargsMap.size() == 0) {
+      return Row.nullRow(Schema.of());
+    } else {
+      Schema schema =
+          generateSchemaFromFieldValues(
+              kwargsMap.values().toArray(), kwargsMap.keySet().toArray(new 
String[] {}));
+      return Row.withSchema(schema)
+          .addValues(convertComplexTypesToRows(kwargsMap.values().toArray()))
+          .build();
+    }
+  }
+
+  private static boolean isCustomType(java.lang.Class<?> type) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to