This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d39e9c5779b Further increase transient expansion service timeout
(#25176)
d39e9c5779b is described below
commit d39e9c5779b0d2aeaf105ada5d4a2ea1600a7a80
Author: Yi Hu <[email protected]>
AuthorDate: Wed Jan 25 20:14:12 2023 -0500
Further increase transient expansion service timeout (#25176)
* Increase transient expansion service timeout
---
.../sdk/extensions/python/PythonExternalTransform.java | 15 +++------------
1 file changed, 3 insertions(+), 12 deletions(-)
diff --git
a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
index 1959f62dd60..6d97f7e2295 100644
---
a/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
+++
b/sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java
@@ -65,15 +65,11 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** Wrapper for invoking external Python transforms. */
public class PythonExternalTransform<InputT extends PInput, OutputT extends
POutput>
extends PTransform<InputT, OutputT> {
- private static final Logger LOG =
LoggerFactory.getLogger(PythonExternalTransform.class);
-
private static final SchemaRegistry SCHEMA_REGISTRY =
SchemaRegistry.createDefault();
private String fullyQualifiedName;
@@ -448,7 +444,7 @@ public class PythonExternalTransform<InputT extends PInput,
OutputT extends POut
PythonService.waitForPort(
Iterables.get(Splitter.on(':').split(expansionService), 0),
Integer.parseInt(Iterables.get(Splitter.on(':').split(expansionService), 1)),
- 60000);
+ 15000);
return apply(input, expansionService, payload);
} else {
int port = PythonService.findAvailablePort();
@@ -472,13 +468,8 @@ public class PythonExternalTransform<InputT extends
PInput, OutputT extends POut
"apache_beam.runners.portability.expansion_service_main",
args.build())
.withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
- // allow more time for service with extra packages to response.
- int timeoutSeconds = extraPackages.isEmpty() ? 15 : 30;
- LOG.info(
- "Expanding Python external transform {} using default transient
expansion service with timeout {}s.",
- fullyQualifiedName,
- timeoutSeconds);
- PythonService.waitForPort("localhost", port, timeoutSeconds * 1000);
+ // allow more time waiting for the port ready for transient
expansion service setup.
+ PythonService.waitForPort("localhost", port, 60000);
return apply(input, String.format("localhost:%s", port), payload);
}
}