scwhittle commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3372601333


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -79,6 +82,8 @@ public final class Work implements RefreshableWork {
   private volatile TimedState currentState;
   private volatile boolean isFailed;
   private volatile String processingThreadName = "";
+  private final AtomicReference<@Nullable AtomicBoolean> onFailureListener =

Review Comment:
   can this be combined with isFailed? when we construct the work if we know 
what group it is going to be part of we could assign the shared AtomicBoolean 
at that point.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########


Review Comment:
   move this next to other failure methods. 
   
   Do we want this to reflect the shared-state? It seems like if we are 
processing a group of keys and are processing key C but key A which was 
previously processed is marked as failed due to heartbeats then we'd want to 
stop processing key C.  I think we'd need to return true here to notice that.
   
   If that's true, perhaps we can get rid of the separate isFailed and failure 
listener and just have a listener? it could be volatile AtomicBoolean perhaps 
and then if the listener is set it merges and replaces it's state, and if set 
externally to null it creates a new atomicbool.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java:
##########
@@ -57,22 +57,35 @@ public boolean start() throws IOException {
   @Override
   public boolean advance() throws IOException {
     if (context.workIsFailed()) {
-      throw new 
WorkItemCancelledException(context.getWorkItem().getShardingKey());
+      throw new 
WorkItemCancelledException(checkNotNull(context.getWorkItem()).getShardingKey());
     }
 
     while (true) {
       if (bundleIndex >= work.getMessageBundlesCount()) {
-        current = null;
+        // If elements are exhausted, try advancing the execution context to 
the next key in the
+        // group
         context.finishKey();
+        if (context.advance()) {
+          // Transition succeeded! Update iterator references to the new work 
item
+          this.work = context.getWork().getWorkItem();

Review Comment:
   how about a resetWork method that you call here and also use in the 
constructor?  Then it will be harder to add one spot and not the other.
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java:
##########


Review Comment:
   Sounds good, might be a nice simplification to follow up with.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -555,7 +666,92 @@ public Map<Long, Pair<Instant, Runnable>> flushState() {
       // RestrictionTracker.getProgress() or GetSize() are not defined.
       outputBuilder.setSourceBacklogBytes(backlogBytes);
     }
-    return callbacks;
+
+    this.accumulatedCallbacks.putAll(callbacks);
+
+    outputBuilder.setSourceBytesProcessed(
+        computeSourceBytesProcessed(sourceBytesProcessCounterName));
+  }
+
+  private final long computeSourceBytesProcessed(String 
sourceBytesCounterName) {
+    if (!(workExecutor instanceof DataflowMapTaskExecutor)) {
+      return 0L;
+    }
+    HashMap<String, ElementCounter> counters =
+        ((DataflowMapTaskExecutor) workExecutor)
+            .getReadOperation()
+            .receivers[0]
+            .getOutputCounters();
+
+    return Optional.ofNullable(counters.get(sourceBytesCounterName))
+        .map(counter -> ((OutputObjectAndByteCounter) 
counter).getByteCount().getAndReset())
+        .orElse(0L);
+  }
+
+  public Map<Long, Pair<Instant, Runnable>> flushState() {
+    return accumulatedCallbacks;
+  }
+
+  public boolean advance() {
+    return false;
+  }
+
+  private void startForNewKey(Work newWork, WindmillStateReader reader) {
+    if (keySwitchListener != null && this.work != null && this.work != 
newWork) {
+      keySwitchListener.onKeySwitch(this.work, newWork);
+    }
+    this.key = decodeKey(newWork);
+    this.work = newWork;
+    this.finishKeyCalled = false;
+    this.computationKey = WindmillComputationKey.create(computationId, 
newWork.getShardedKey());
+
+    this.outputBuilder = createOutputBuilder(newWork);
+    this.outputBuilders.add(this.outputBuilder);
+    newWork.setOnFailureListener(() -> this.workIsFailed = true);
+    this.executedWorks.add(newWork);
+
+    logHotKeyIfDetected(newWork, this.key);
+
+    // Note: We do NOT clear sideInputCache here, allowing Key B to reuse warm 
side inputs!
+
+    // Re-initialize state cache and state/timer internals across all step 
contexts
+    Instant processingTime =
+        
computeProcessingTime(newWork.getWorkItem().getTimers().getTimersList());
+    if (!getAllStepContexts().isEmpty()) {
+      // This must be only created once for a workItem as token validation 
will fail if the same

Review Comment:
   Separate from the logic here, I was wondering if we want/need to try to 
prevent processing a batch with the same key multiple times



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to