This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a45b0dfc5d6 Remove cancelled tasks from ReadOperation queue when 
shutting down (#34335)
a45b0dfc5d6 is described below

commit a45b0dfc5d67d2e1ccc1581f5656e777d41e23f7
Author: Danny McCormick <[email protected]>
AuthorDate: Wed Mar 19 08:46:27 2025 -0400

    Remove cancelled tasks from ReadOperation queue when shutting down (#34335)
    
    * Remove cancelled tasks from ReadOperation queue when shutting down
    
    * Try shutdowNow
    
    * spotless
---
 .../dataflow/worker/util/common/worker/ReadOperation.java    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
index 1ee8f2bc843..d6b020483d4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
@@ -225,8 +225,16 @@ public class ReadOperation extends Operation {
         if (!scheduler.isTerminated()) {
           LOG.error(
               "Failed to terminate periodic progress reporting in 1 minute. "
-                  + "Waiting for it to terminate indefinitely...");
-          scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+                  + "Waiting for it to terminate 10 minutes before forcing");
+          scheduler.awaitTermination(10, TimeUnit.MINUTES);
+          if (!scheduler.isTerminated()) {
+            LOG.error(
+                "Failed to terminate periodic progress reporting in 10 "
+                    + "minutes. Trying to force termination then waiting "
+                    + "indefinitely...");
+            scheduler.shutdownNow();
+            scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+          }
           LOG.info("Periodic progress reporting terminated.");
         }
       }

Reply via email to