[ 
https://issues.apache.org/jira/browse/BEAM-14343?focusedWorklogId=761320&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-761320
 ]

ASF GitHub Bot logged work on BEAM-14343:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/22 17:36
            Start Date: 23/Apr/22 17:36
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r856943846


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -65,8 +66,10 @@
   private @Nullable Object @NonNull [] argsArray;
   private @Nullable Row providedKwargsRow;
 
-  private ExternalPythonTransform(String fullyQualifiedName, int 
expansionPort) {
+  private ExternalPythonTransform(
+      String fullyQualifiedName, String expansionHost, int expansionPort) {

Review Comment:
   Still unsure shy the API is different from Python where we take a single 
"expansion_service" address: 
https://github.com/apache/beam/blob/3f2e3c7c9eccb9d40370cbc70e9a451a4b5573f5/sdks/python/apache_beam/transforms/external.py#L417



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -268,20 +268,31 @@ public OutputT expand(InputT input) {
     Row argsRow = buildOrGetArgsRow();
     Row kwargsRow = buildOrGetKwargsRow();
     try {
-      Schema payloadSchema =
-          Schema.of(
-              Schema.Field.of("constructor", Schema.FieldType.STRING),
-              Schema.Field.of("args", 
Schema.FieldType.row(argsRow.getSchema())),
-              Schema.Field.of("kwargs", 
Schema.FieldType.row(kwargsRow.getSchema())));
+      Schema.Builder schemaBuilder = Schema.builder();
+      schemaBuilder.addStringField("constructor");
+      if (argsRow.getValues().size() > 0) {
+        schemaBuilder.addRowField("args", argsRow.getSchema());
+      }
+      if (kwargsRow.getValues().size() > 0) {
+        schemaBuilder.addRowField("kwargs", kwargsRow.getSchema());
+      }
+      Schema payloadSchema = schemaBuilder.build();
       payloadSchema.setUUID(UUID.randomUUID());
-      Row payloadRow =
-          Row.withSchema(payloadSchema).addValues(fullyQualifiedName, argsRow, 
kwargsRow).build();
+      Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+      payloadRowBuilder.addValue(fullyQualifiedName);
+      if (argsRow.getValues().size() > 0) {

Review Comment:
   Please add a comment and a unit test so that this doesn't get broken by a 
future change.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 761320)
    Time Spent: 1h 10m  (was: 1h)

> Allow expansion service override in ExternalPythonTransform
> -----------------------------------------------------------
>
>                 Key: BEAM-14343
>                 URL: https://issues.apache.org/jira/browse/BEAM-14343
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, sdk-java-core
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: P2
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Allow expansion service override in ExternalPythonTransform



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to