arunpandianp commented on code in PR #30270:
URL: https://github.com/apache/beam/pull/30270#discussion_r1484672554
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -313,19 +325,24 @@ public Closeable enterState(ExecutionState newState) {
if (isDataflowProcessElementState) {
DataflowExecutionState newDFState = (DataflowExecutionState) newState;
if (newDFState.getStepName() != null &&
newDFState.getStepName().userName() != null) {
- if (this.activeMessageMetadata != null) {
- recordActiveMessageInProcessingTimesMap();
+ synchronized (this) {
+ if (this.activeMessageMetadata != null) {
Review Comment:
```suggestion
```
nit: We can remove this check here since
`recordActiveMessageInProcessingTimesMap` also has a null check inside
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -313,19 +325,24 @@ public Closeable enterState(ExecutionState newState) {
if (isDataflowProcessElementState) {
DataflowExecutionState newDFState = (DataflowExecutionState) newState;
if (newDFState.getStepName() != null &&
newDFState.getStepName().userName() != null) {
- if (this.activeMessageMetadata != null) {
- recordActiveMessageInProcessingTimesMap();
+ synchronized (this) {
+ if (this.activeMessageMetadata != null) {
+ recordActiveMessageInProcessingTimesMap();
+ }
+ this.activeMessageMetadata =
+ ActiveMessageMetadata.create(
+ newDFState.getStepName().userName(), clock.getMillis());
}
- this.activeMessageMetadata =
-
ActiveMessageMetadata.create(newDFState.getStepName().userName(),
clock.getMillis());
}
elementExecutionTracker.enter(newDFState.getStepName());
}
return () -> {
if (isDataflowProcessElementState) {
- if (this.activeMessageMetadata != null) {
- recordActiveMessageInProcessingTimesMap();
+ synchronized (this) {
+ if (this.activeMessageMetadata != null) {
Review Comment:
```suggestion
```
check can be removed since recordActiveMessageInProcessingTimesMap has a
null check inside.
Removing the check also allows removing the `synchronized (this) {` since
recordActiveMessageInProcessingTimesMap synchronizes.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -361,7 +387,9 @@ private void recordActiveMessageInProcessingTimesMap() {
if (v == null) {
v = new IntSummaryStatistics();
}
- v.accept((int) (System.currentTimeMillis() -
this.activeMessageMetadata.startTime()));
+ synchronized (this) {
Review Comment:
```suggestion
```
This is redundant since compute is synchronous and the callback is called
synchronously.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]