[
https://issues.apache.org/jira/browse/BEAM-14343?focusedWorklogId=759707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-759707
]
ASF GitHub Bot logged work on BEAM-14343:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Apr/22 01:30
Start Date: 21/Apr/22 01:30
Worklog Time Spent: 10m
Work Description: ihji commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r854698330
##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -271,29 +279,21 @@ public OutputT expand(InputT input) {
ByteString.copyFrom(
CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema),
payloadRow)))
.build();
- try (AutoCloseable p = service.start()) {
- PythonService.waitForPort("localhost", port, 15000);
- PTransform<PInput, PCollectionTuple> transform =
- External.<PInput, Object>of(
- "beam:transforms:python:fully_qualified_named",
- payload.toByteArray(),
- "localhost:" + port)
- .withMultiOutputs();
- PCollectionTuple outputs;
- if (input instanceof PCollection) {
- outputs = ((PCollection<?>) input).apply(transform);
- } else if (input instanceof PCollectionTuple) {
- outputs = ((PCollectionTuple) input).apply(transform);
- } else if (input instanceof PBegin) {
- outputs = ((PBegin) input).apply(transform);
- } else {
- throw new RuntimeException("Unhandled input type " +
input.getClass());
- }
- Set<TupleTag<?>> tags = outputs.getAll().keySet();
- if (tags.size() == 1) {
- return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
- } else {
- return (OutputT) outputs;
+ if (expansionPort > 0) {
+ PythonService.waitForPort("localhost", expansionPort, 15000);
Review Comment:
Still there's no guarantee that the specified service is already up when we
send the expansion request. Sometimes expansion services need a few seconds to
be fully launched.
Issue Time Tracking
-------------------
Worklog Id: (was: 759707)
Time Spent: 50m (was: 40m)
> Allow expansion service override in ExternalPythonTransform
> -----------------------------------------------------------
>
> Key: BEAM-14343
> URL: https://issues.apache.org/jira/browse/BEAM-14343
> Project: Beam
> Issue Type: Bug
> Components: cross-language, sdk-java-core
> Reporter: Heejong Lee
> Assignee: Heejong Lee
> Priority: P2
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Allow expansion service override in ExternalPythonTransform
--
This message was sent by Atlassian Jira
(v8.20.7#820007)