This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 62bdf56e210 Issue warning on long running DoFn.Setup on legacy 
Dataflow runner (#34215)
62bdf56e210 is described below

commit 62bdf56e210e6ea0e3be2bfad180efc19b4e03aa
Author: Yi Hu <[email protected]>
AuthorDate: Tue Mar 11 15:28:18 2025 -0400

    Issue warning on long running DoFn.Setup on legacy Dataflow runner (#34215)
---
 .../beam/runners/dataflow/worker/DoFnInstanceManagers.java | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnInstanceManagers.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnInstanceManagers.java
index 28fc1df3958..60a1b067cc5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnInstanceManagers.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DoFnInstanceManagers.java
@@ -23,12 +23,15 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Common {@link DoFnInstanceManager} implementations. */
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 public class DoFnInstanceManagers {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DoFnInstanceManagers.class);
   /**
    * Returns a {@link DoFnInstanceManager} that returns {@link DoFnInfo} 
instances obtained by
    * deserializing the provided bytes. {@link DoFnInstanceManager} will call 
{@link DoFn.Setup} as
@@ -83,7 +86,18 @@ public class DoFnInstanceManagers {
     private DoFnInfo<?, ?> deserializeCopy() throws Exception {
       DoFnInfo<?, ?> fn;
       fn = (DoFnInfo<?, ?>) 
SerializableUtils.deserializeFromByteArray(serializedFnInfo, null);
+      long startMillis = System.currentTimeMillis();
       DoFnInvokers.tryInvokeSetupFor(fn.getDoFn(), options);
+      long elapsed = System.currentTimeMillis() - startMillis;
+      if (elapsed > 180_000) { // 3 min
+        // Work item could fail for long-running setup due to Dataflow worker 
lease timeout
+        LOG.warn(
+            String.format(
+                "DoFn.setup for %s ran for %d seconds.\nThis could cause 
Dataflow worker "
+                    + "lease expire and failing the job. DoFn.Setup should not 
contain long "
+                    + "running operations.",
+                fn.getDoFn().getClass(), elapsed / 1_000));
+      }
       return fn;
     }
 

Reply via email to