m-trieu commented on code in PR #28755:
URL: https://github.com/apache/beam/pull/28755#discussion_r1362747723


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -478,8 +415,84 @@ <T, W extends BoundedWindow> void writePCollectionViewData(
         throws IOException;
   }
 
-  String getStateFamily(NameContext nameContext) {
-    return nameContext.userName() == null ? null : 
stateNameMap.get(nameContext.userName());
+  /**
+   * Execution states in Streaming are shared between multiple map-task 
executors. Thus this class
+   * needs to be thread safe for multiple writers. A single stage could have 
have multiple executors
+   * running concurrently.
+   */
+  public static class StreamingModeExecutionState
+      extends DataflowOperationContext.DataflowExecutionState {
+
+    // AtomicLong is used because this value is written in two places:
+    // 1. The sampling thread calls takeSample to increment the time spent in 
this state
+    // 2. The reporting thread calls extractUpdate which reads the current sum 
*AND* sets it to 0.
+    private final AtomicLong totalMillisInState = new AtomicLong();
+
+    // The worker that created this state.  Used to report lulls back to the 
worker.
+    @SuppressWarnings("unused") // Affects a public api

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to