This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 62bdf56e210 Issue warning on long running DoFn.Setup on legacy
Dataflow runner (#34215)
62bdf56e210 is described below
commit 62bdf56e210e6ea0e3be2bfad180efc19b4e03aa
Author: Yi Hu <[email protected]>
AuthorDate: Tue Mar 11 15:28:18 2025 -0400
Issue warning on long running DoFn.Setup on legacy Dataflow runner (#34215)
---
.../beam/runners/dataflow/worker/DoFnInstanceManagers.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnInstanceManagers.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnInstanceManagers.java
index 28fc1df3958..60a1b067cc5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnInstanceManagers.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnInstanceManagers.java
@@ -23,12 +23,15 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.DoFnInfo;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Common {@link DoFnInstanceManager} implementations. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DoFnInstanceManagers {
+ private static final Logger LOG =
LoggerFactory.getLogger(DoFnInstanceManagers.class);
/**
* Returns a {@link DoFnInstanceManager} that returns {@link DoFnInfo}
instances obtained by
* deserializing the provided bytes. {@link DoFnInstanceManager} will call
{@link DoFn.Setup} as
@@ -83,7 +86,18 @@ public class DoFnInstanceManagers {
private DoFnInfo<?, ?> deserializeCopy() throws Exception {
DoFnInfo<?, ?> fn;
fn = (DoFnInfo<?, ?>)
SerializableUtils.deserializeFromByteArray(serializedFnInfo, null);
+ long startMillis = System.currentTimeMillis();
DoFnInvokers.tryInvokeSetupFor(fn.getDoFn(), options);
+ long elapsed = System.currentTimeMillis() - startMillis;
+ if (elapsed > 180_000) { // 3 min
+ // Work item could fail for long-running setup due to Dataflow worker
lease timeout
+ LOG.warn(
+ String.format(
+ "DoFn.setup for %s ran for %d seconds.\nThis could cause
Dataflow worker "
+ + "lease expire and failing the job. DoFn.Setup should not
contain long "
+ + "running operations.",
+ fn.getDoFn().getClass(), elapsed / 1_000));
+ }
return fn;
}