This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a45b0dfc5d6 Remove cancelled tasks from ReadOperation queue when
shutting down (#34335)
a45b0dfc5d6 is described below
commit a45b0dfc5d67d2e1ccc1581f5656e777d41e23f7
Author: Danny McCormick <[email protected]>
AuthorDate: Wed Mar 19 08:46:27 2025 -0400
Remove cancelled tasks from ReadOperation queue when shutting down (#34335)
* Remove cancelled tasks from ReadOperation queue when shutting down
* Try shutdowNow
* spotless
---
.../dataflow/worker/util/common/worker/ReadOperation.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
index 1ee8f2bc843..d6b020483d4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ReadOperation.java
@@ -225,8 +225,16 @@ public class ReadOperation extends Operation {
if (!scheduler.isTerminated()) {
LOG.error(
"Failed to terminate periodic progress reporting in 1 minute. "
- + "Waiting for it to terminate indefinitely...");
- scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ + "Waiting for it to terminate 10 minutes before forcing");
+ scheduler.awaitTermination(10, TimeUnit.MINUTES);
+ if (!scheduler.isTerminated()) {
+ LOG.error(
+ "Failed to terminate periodic progress reporting in 10 "
+ + "minutes. Trying to force termination then waiting "
+ + "indefinitely...");
+ scheduler.shutdownNow();
+ scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ }
LOG.info("Periodic progress reporting terminated.");
}
}