arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3370654601


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -555,7 +666,92 @@ public Map<Long, Pair<Instant, Runnable>> flushState() {
       // RestrictionTracker.getProgress() or GetSize() are not defined.
       outputBuilder.setSourceBacklogBytes(backlogBytes);
     }
-    return callbacks;
+
+    this.accumulatedCallbacks.putAll(callbacks);
+
+    outputBuilder.setSourceBytesProcessed(
+        computeSourceBytesProcessed(sourceBytesProcessCounterName));
+  }
+
+  private final long computeSourceBytesProcessed(String 
sourceBytesCounterName) {
+    if (!(workExecutor instanceof DataflowMapTaskExecutor)) {
+      return 0L;
+    }
+    HashMap<String, ElementCounter> counters =
+        ((DataflowMapTaskExecutor) workExecutor)
+            .getReadOperation()
+            .receivers[0]
+            .getOutputCounters();
+
+    return Optional.ofNullable(counters.get(sourceBytesCounterName))
+        .map(counter -> ((OutputObjectAndByteCounter) 
counter).getByteCount().getAndReset())
+        .orElse(0L);
+  }
+
+  public Map<Long, Pair<Instant, Runnable>> flushState() {
+    return accumulatedCallbacks;
+  }
+
+  public boolean advance() {
+    return false;
+  }
+
+  private void startForNewKey(Work newWork, WindmillStateReader reader) {
+    if (keySwitchListener != null && this.work != null && this.work != 
newWork) {
+      keySwitchListener.onKeySwitch(this.work, newWork);
+    }
+    this.key = decodeKey(newWork);
+    this.work = newWork;
+    this.finishKeyCalled = false;
+    this.computationKey = WindmillComputationKey.create(computationId, 
newWork.getShardedKey());
+
+    this.outputBuilder = createOutputBuilder(newWork);
+    this.outputBuilders.add(this.outputBuilder);
+    newWork.setOnFailureListener(() -> this.workIsFailed = true);
+    this.executedWorks.add(newWork);
+
+    logHotKeyIfDetected(newWork, this.key);
+
+    // Note: We do NOT clear sideInputCache here, allowing Key B to reuse warm 
side inputs!
+
+    // Re-initialize state cache and state/timer internals across all step 
contexts
+    Instant processingTime =
+        
computeProcessingTime(newWork.getWorkItem().getTimers().getTimersList());
+    if (!getAllStepContexts().isEmpty()) {
+      // This must be only created once for a workItem as token validation 
will fail if the same

Review Comment:
   It is an old comment that i moved here. 
   
   stateCache.forKey internally checks if the input workToken is > the last 
seen workToken, else will invalidate the in-memory cache and returns a new 
ForKey cache. I think that is why the comment says do this once per workitem. 
   
   > I'm wondering if there are cases with retries etc that we could get the 
same key and possibly same work token. Should we guard against that somehow?
   
   IIUC, the worktoken check in stateCache.forKey guards against this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to