This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f6322dae132 Fix ConcurrentModification exception possible in
DataflowExecutionStateSampler (#30993)
f6322dae132 is described below
commit f6322dae13230cd3e15da3cae7ac2158b5efc7fb
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Apr 16 19:16:34 2024 +0200
Fix ConcurrentModification exception possible in
DataflowExecutionStateSampler (#30993)
Also ensure that the result is not modified by observing it. Previously we
were merging into completed with each observation which appeared unintended.
---
.../dataflow/worker/DataflowExecutionStateSampler.java | 14 ++++++++------
.../dataflow/worker/DataflowExecutionStateSamplerTest.java | 3 +++
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
index 1ff9a9be40d..80955185eea 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java
@@ -39,6 +39,7 @@ public final class DataflowExecutionStateSampler extends
ExecutionStateSampler {
private final ConcurrentHashMap<String, DataflowExecutionStateTracker>
activeTrackersByWorkId =
new ConcurrentHashMap<>();
+ // The maps within completeProcessingMetrics should not be modified.
private final ConcurrentHashMap<String, Map<String, IntSummaryStatistics>>
completedProcessingMetrics = new ConcurrentHashMap<>();
@@ -64,7 +65,7 @@ public final class DataflowExecutionStateSampler extends
ExecutionStateSampler {
this.activeTrackersByWorkId.put(dfTracker.getWorkItemId(), dfTracker);
}
- private static Map<String, IntSummaryStatistics> mergeStepStatsMaps(
+ private static void mergeStepStatsMaps(
Map<String, IntSummaryStatistics> map1, Map<String,
IntSummaryStatistics> map2) {
for (Entry<String, IntSummaryStatistics> steps : map2.entrySet()) {
map1.compute(
@@ -77,7 +78,6 @@ public final class DataflowExecutionStateSampler extends
ExecutionStateSampler {
return v;
});
}
- return map1;
}
@Override
@@ -118,13 +118,15 @@ public final class DataflowExecutionStateSampler extends
ExecutionStateSampler {
}
public Map<String, IntSummaryStatistics>
getProcessingDistributionsForWorkId(String workId) {
+ Map<String, IntSummaryStatistics> result;
DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId);
if (tracker == null) {
- return completedProcessingMetrics.getOrDefault(workId, new HashMap<>());
+ result = new HashMap<>();
+ } else {
+ result = tracker.getProcessingTimesByStepCopy();
}
- return mergeStepStatsMaps(
- completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
- tracker.getProcessingTimesByStepCopy());
+ mergeStepStatsMaps(result, completedProcessingMetrics.getOrDefault(workId,
new HashMap<>()));
+ return result;
}
public void resetForWorkId(String workId) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
index 920e37d40ec..ab5059bd937 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java
@@ -106,6 +106,9 @@ public class DataflowExecutionStateSamplerTest {
assertThat(sampler.getActiveMessageMetadataForWorkId(workId).get(),
equalTo(testMetadata));
assertThat(
sampler.getProcessingDistributionsForWorkId(workId),
equalTo(testCompletedProcessingTimes));
+ // Repeated calls should not modify the result.
+ assertThat(
+ sampler.getProcessingDistributionsForWorkId(workId),
equalTo(testCompletedProcessingTimes));
}
@Test