This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bfc64d5c14a Fix error when ActiveWorkRefresher processed empty
heartbeat map. (#32078)
bfc64d5c14a is described below
commit bfc64d5c14adea209364d48b0fe9b4e8ba6eaab5
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Aug 5 11:34:45 2024 +0200
Fix error when ActiveWorkRefresher processed empty heartbeat map. (#32078)
---
.../windmill/work/refresh/ActiveWorkRefresher.java | 3 ++
.../work/refresh/ActiveWorkRefresherTest.java | 38 +++++++++++++++++++---
2 files changed, 37 insertions(+), 4 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java
index 499d2e5b694..781285def02 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java
@@ -130,6 +130,9 @@ public final class ActiveWorkRefresher {
Instant refreshDeadline =
clock.get().minus(Duration.millis(activeWorkRefreshPeriodMillis));
Map<HeartbeatSender, Heartbeats> heartbeatsBySender =
aggregateHeartbeatsBySender(refreshDeadline);
+ if (heartbeatsBySender.isEmpty()) {
+ return;
+ }
List<CompletableFuture<Void>> fanOutRefreshActiveWork = new ArrayList<>();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
index 9dce3392c60..5efb2421fe6 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
@@ -194,10 +195,13 @@ public class ActiveWorkRefresherTest {
assertThat(heartbeatRequests)
.comparingElementsUsing(
Correspondence.from(
- (Windmill.HeartbeatRequest h, Work w) ->
- h.getWorkToken() == w.getWorkItem().getWorkToken()
- && h.getCacheToken() ==
w.getWorkItem().getWorkToken()
- && h.getShardingKey() ==
w.getWorkItem().getShardingKey(),
+ (Windmill.HeartbeatRequest h, Work w) -> {
+ assert h != null;
+ assert w != null;
+ return h.getWorkToken() == w.getWorkItem().getWorkToken()
+ && h.getCacheToken() == w.getWorkItem().getWorkToken()
+ && h.getShardingKey() ==
w.getWorkItem().getShardingKey();
+ },
"heartbeatRequest's and Work's workTokens, cacheTokens, and
shardingKeys should be equal."))
.containsExactlyElementsIn(work);
}
@@ -207,6 +211,32 @@ public class ActiveWorkRefresherTest {
workIsProcessed.countDown();
}
+ @Test
+ public void testEmptyActiveWorkRefresh() throws InterruptedException {
+ int activeWorkRefreshPeriodMillis = 100;
+
+ List<ComputationState> computations = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ ComputationState computationState = createComputationState(i);
+ computations.add(computationState);
+ }
+
+ CountDownLatch heartbeatsSent = new CountDownLatch(1);
+ TestClock fakeClock = new TestClock(Instant.now());
+ ActiveWorkRefresher activeWorkRefresher =
+ createActiveWorkRefresher(
+ fakeClock::now,
+ activeWorkRefreshPeriodMillis,
+ 0,
+ () -> computations,
+ heartbeats -> heartbeatsSent::countDown);
+
+ activeWorkRefresher.start();
+ fakeClock.advance(Duration.millis(activeWorkRefreshPeriodMillis * 2));
+ assertFalse(heartbeatsSent.await(500, TimeUnit.MILLISECONDS));
+ activeWorkRefresher.stop();
+ }
+
@Test
public void testInvalidateStuckCommits() throws InterruptedException {
int stuckCommitDurationMillis = 100;