This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b923a67369e fix bug in getProcessingTimesByStepCopy (#30270)
b923a67369e is described below
commit b923a67369e0707575f974aa4cba700091aeb916
Author: clmccart <[email protected]>
AuthorDate: Tue Feb 13 01:22:11 2024 -0800
fix bug in getProcessingTimesByStepCopy (#30270)
Co-authored-by: Claire McCarthy <[email protected]>
---
.../dataflow/worker/DataflowExecutionContext.java | 42 +++++++++++++++-------
1 file changed, 29 insertions(+), 13 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 7d45295b2d8..080fa7c9dac 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -29,6 +29,8 @@ import java.util.IntSummaryStatistics;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StepContext;
@@ -177,6 +179,7 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
/** Dataflow specific {@link StepContext}. */
public abstract static class DataflowStepContext implements StepContext {
+
private final NameContext nameContext;
public DataflowStepContext(NameContext nameContext) {
@@ -253,10 +256,13 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
* Metadata on the message whose processing is currently being managed by
this tracker. If no
* message is actively being processed, activeMessageMetadata will be null.
*/
- @Nullable private ActiveMessageMetadata activeMessageMetadata = null;
+ @GuardedBy("this")
+ @Nullable
+ private ActiveMessageMetadata activeMessageMetadata = null;
private final MillisProvider clock = System::currentTimeMillis;
+ @GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep =
new HashMap<>();
public DataflowExecutionStateTracker(
@@ -313,20 +319,19 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
if (isDataflowProcessElementState) {
DataflowExecutionState newDFState = (DataflowExecutionState) newState;
if (newDFState.getStepName() != null &&
newDFState.getStepName().userName() != null) {
- if (this.activeMessageMetadata != null) {
- recordActiveMessageInProcessingTimesMap();
+ recordActiveMessageInProcessingTimesMap();
+ synchronized (this) {
+ this.activeMessageMetadata =
+ ActiveMessageMetadata.create(
+ newDFState.getStepName().userName(), clock.getMillis());
}
- this.activeMessageMetadata =
-
ActiveMessageMetadata.create(newDFState.getStepName().userName(),
clock.getMillis());
}
elementExecutionTracker.enter(newDFState.getStepName());
}
return () -> {
if (isDataflowProcessElementState) {
- if (this.activeMessageMetadata != null) {
- recordActiveMessageInProcessingTimesMap();
- }
+ recordActiveMessageInProcessingTimesMap();
elementExecutionTracker.exit();
}
baseCloseable.close();
@@ -337,12 +342,21 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
return this.workItemId;
}
- public Optional<ActiveMessageMetadata> getActiveMessageMetadata() {
+ public synchronized Optional<ActiveMessageMetadata>
getActiveMessageMetadata() {
return Optional.ofNullable(activeMessageMetadata);
}
- public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
- Map<String, IntSummaryStatistics> processingTimesCopy =
processingTimesByStep;
+ public synchronized Map<String, IntSummaryStatistics>
getProcessingTimesByStepCopy() {
+ Map<String, IntSummaryStatistics> processingTimesCopy =
+ processingTimesByStep.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ e -> e.getKey(),
+ e -> {
+ IntSummaryStatistics clone = new
IntSummaryStatistics();
+ clone.combine(e.getValue());
+ return clone;
+ }));
return processingTimesCopy;
}
@@ -351,17 +365,19 @@ public abstract class DataflowExecutionContext<T extends
DataflowStepContext> {
* processing times map. Sets the activeMessageMetadata to null after the
entry has been
* recorded.
*/
- private void recordActiveMessageInProcessingTimesMap() {
+ private synchronized void recordActiveMessageInProcessingTimesMap() {
if (this.activeMessageMetadata == null) {
return;
}
+ int processingTime =
+ (int) (System.currentTimeMillis() -
this.activeMessageMetadata.startTime());
this.processingTimesByStep.compute(
this.activeMessageMetadata.userStepName(),
(k, v) -> {
if (v == null) {
v = new IntSummaryStatistics();
}
- v.accept((int) (System.currentTimeMillis() -
this.activeMessageMetadata.startTime()));
+ v.accept(processingTime);
return v;
});
this.activeMessageMetadata = null;