[
https://issues.apache.org/jira/browse/BEAM-14343?focusedWorklogId=759664&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-759664
]
ASF GitHub Bot logged work on BEAM-14343:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Apr/22 00:20
Start Date: 21/Apr/22 00:20
Worklog Time Spent: 10m
Work Description: chamikaramj commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r854665303
##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -64,8 +65,9 @@
private @Nullable Object @NonNull [] argsArray;
private @Nullable Row providedKwargsRow;
- private ExternalPythonTransform(String fullyQualifiedName) {
+ private ExternalPythonTransform(String fullyQualifiedName, int
expansionPort) {
Review Comment:
Can we change to an address so that expansion service can be hosted remotely
(similar to Python) ?
##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -271,29 +279,21 @@ public OutputT expand(InputT input) {
ByteString.copyFrom(
CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema),
payloadRow)))
.build();
- try (AutoCloseable p = service.start()) {
- PythonService.waitForPort("localhost", port, 15000);
- PTransform<PInput, PCollectionTuple> transform =
- External.<PInput, Object>of(
- "beam:transforms:python:fully_qualified_named",
- payload.toByteArray(),
- "localhost:" + port)
- .withMultiOutputs();
- PCollectionTuple outputs;
- if (input instanceof PCollection) {
- outputs = ((PCollection<?>) input).apply(transform);
- } else if (input instanceof PCollectionTuple) {
- outputs = ((PCollectionTuple) input).apply(transform);
- } else if (input instanceof PBegin) {
- outputs = ((PBegin) input).apply(transform);
- } else {
- throw new RuntimeException("Unhandled input type " +
input.getClass());
- }
- Set<TupleTag<?>> tags = outputs.getAll().keySet();
- if (tags.size() == 1) {
- return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
- } else {
- return (OutputT) outputs;
+ if (expansionPort > 0) {
+ PythonService.waitForPort("localhost", expansionPort, 15000);
Review Comment:
Why do we have to do this when port/address is specified ?
Issue Time Tracking
-------------------
Worklog Id: (was: 759664)
Time Spent: 0.5h (was: 20m)
> Allow expansion service override in ExternalPythonTransform
> -----------------------------------------------------------
>
> Key: BEAM-14343
> URL: https://issues.apache.org/jira/browse/BEAM-14343
> Project: Beam
> Issue Type: Bug
> Components: cross-language, sdk-java-core
> Reporter: Heejong Lee
> Assignee: Heejong Lee
> Priority: P2
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Allow expansion service override in ExternalPythonTransform
--
This message was sent by Atlassian Jira
(v8.20.7#820007)