[ 
https://issues.apache.org/jira/browse/BEAM-14343?focusedWorklogId=759664&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-759664
 ]

ASF GitHub Bot logged work on BEAM-14343:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Apr/22 00:20
            Start Date: 21/Apr/22 00:20
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r854665303


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -64,8 +65,9 @@
   private @Nullable Object @NonNull [] argsArray;
   private @Nullable Row providedKwargsRow;
 
-  private ExternalPythonTransform(String fullyQualifiedName) {
+  private ExternalPythonTransform(String fullyQualifiedName, int 
expansionPort) {

Review Comment:
   Can we change to an address so that expansion service can be hosted remotely 
(similar to Python) ?



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -271,29 +279,21 @@ public OutputT expand(InputT input) {
                   ByteString.copyFrom(
                       CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), 
payloadRow)))
               .build();
-      try (AutoCloseable p = service.start()) {
-        PythonService.waitForPort("localhost", port, 15000);
-        PTransform<PInput, PCollectionTuple> transform =
-            External.<PInput, Object>of(
-                    "beam:transforms:python:fully_qualified_named",
-                    payload.toByteArray(),
-                    "localhost:" + port)
-                .withMultiOutputs();
-        PCollectionTuple outputs;
-        if (input instanceof PCollection) {
-          outputs = ((PCollection<?>) input).apply(transform);
-        } else if (input instanceof PCollectionTuple) {
-          outputs = ((PCollectionTuple) input).apply(transform);
-        } else if (input instanceof PBegin) {
-          outputs = ((PBegin) input).apply(transform);
-        } else {
-          throw new RuntimeException("Unhandled input type " + 
input.getClass());
-        }
-        Set<TupleTag<?>> tags = outputs.getAll().keySet();
-        if (tags.size() == 1) {
-          return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
-        } else {
-          return (OutputT) outputs;
+      if (expansionPort > 0) {
+        PythonService.waitForPort("localhost", expansionPort, 15000);

Review Comment:
   Why do we have to do this when port/address is specified ?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 759664)
    Time Spent: 0.5h  (was: 20m)

> Allow expansion service override in ExternalPythonTransform
> -----------------------------------------------------------
>
>                 Key: BEAM-14343
>                 URL: https://issues.apache.org/jira/browse/BEAM-14343
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, sdk-java-core
>            Reporter: Heejong Lee
>            Assignee: Heejong Lee
>            Priority: P2
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Allow expansion service override in ExternalPythonTransform



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to