clmccart commented on code in PR #30270:
URL: https://github.com/apache/beam/pull/30270#discussion_r1484670241


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -341,8 +342,8 @@ public Optional<ActiveMessageMetadata> 
getActiveMessageMetadata() {
       return Optional.ofNullable(activeMessageMetadata);
     }
 
-    public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
-      Map<String, IntSummaryStatistics> processingTimesCopy = 
processingTimesByStep;
+    public synchronized Map<String, IntSummaryStatistics> 
getProcessingTimesByStepCopy() {
+      Map<String, IntSummaryStatistics> processingTimesCopy = new 
HashMap<>(processingTimesByStep);

Review Comment:
   the sampler will also ask to read activeMessageMetadata so i think that 
needs to be synchronized too.
   
   as for deep copying IntSummaryStatistics values, the compiler doesn't like 
the clone: `'clone()' has protected access in 'java.lang.Object'`
   
   would something like this work?
   
         Map<String, IntSummaryStatistics> processingTimesCopy =
             processingTimesByStep.entrySet().stream()
                 .collect(Collectors.toMap(e -> e.getKey(), e -> {
                   IntSummaryStatistics clone = new IntSummaryStatistics();
                   clone.combine(e.getValue());
                   return clone;
                 }));



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to