scwhittle commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3497645514


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -243,50 +291,120 @@ public byte[] getCurrentRecordOffset() {
     return checkStateNotNull(activeReader).getCurrentRecordOffset();
   }
 
+  /** Reset context before using it on a new bundle */
+  public void reset() {
+    this.executedWorks = new ArrayList<>();
+    this.outputBuilders = new ArrayList<>();
+    this.accumulatedCallbacks = new HashMap<>();
+    // Work from prior bundles might have a reference to the old 
workBatchFailed.
+    // If the work gets retried it'll get the new workBatchFailed to notify 
failure.
+    this.workBatchFailed = new AtomicBoolean(false);
+    this.sideInputCache.clear();
+    this.activeStateReader = null;
+    this.activeReader = null;
+    this.keyCoder = null;
+    this.workExecutor = null;
+    this.workQueueExecutor = null;
+    this.budgetHandle = null;
+    this.keyTransitionListener = null;
+    this.work = null;
+    this.key = null;
+    this.outputBuilder = null;
+    this.sideInputStateFetcher = null;
+    this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
+    clearSinkFullHint();
+    this.stateBytesRead = 0;
+  }
+
   public void start(
-      @Nullable Object key,
       Work work,
       WindmillStateReader stateReader,
-      SideInputStateFetcher sideInputStateFetcher,
-      Windmill.WorkItemCommitRequest.Builder outputBuilder,
-      WorkExecutor workExecutor) {
-    this.key = key;
-    this.work = work;
+      WorkExecutor workExecutor,
+      BoundedQueueExecutor workQueueExecutor,
+      BoundedQueueExecutorWorkHandle budgetHandle,
+      @Nullable Coder<?> keyCoder,
+      KeyTransitionListener keyTransitionListener) {
+    reset();
+    this.keyCoder = keyCoder;
     this.workExecutor = workExecutor;
-    this.finishKeyCalled = false;
-    this.computationKey = WindmillComputationKey.create(computationId, 
work.getShardedKey());
-    this.sideInputStateFetcher = sideInputStateFetcher;
+    this.workQueueExecutor = workQueueExecutor;
+    this.budgetHandle = budgetHandle;
+    this.keyTransitionListener = keyTransitionListener;
+
     StreamingGlobalConfig config = globalConfigHandle.getConfig();
     // Snapshot the limits for entire bundle processing.
     this.operationalLimits = config.operationalLimits();
-    this.outputBuilder = outputBuilder;
-    this.sideInputCache.clear();
-    this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
-    clearSinkFullHint();
 
-    Instant processingTime = 
computeProcessingTime(work.getWorkItem().getTimers().getTimersList());
+    startForNewKey(work, stateReader);
+  }
 
-    Collection<? extends StepContext> stepContexts = getAllStepContexts();
-    if (!stepContexts.isEmpty()) {
-      // This must be only created once for the workItem as token validation 
will fail if the same
-      // work token is reused.
-      WindmillStateCache.ForKey cacheForKey =
-          stateCache.forKey(getComputationKey(), 
getWorkItem().getCacheToken(), getWorkToken());
-      for (StepContext stepContext : stepContexts) {
-        stepContext.start(stateReader, processingTime, cacheForKey, 
work.watermarks());
+  private @Nullable Object decodeKey(Work work) {
+    // If the read output KVs, then we can decode Windmill's byte key into 
userland
+    // key object and provide it to the execution context for use with per-key 
state.
+    // Otherwise, we pass null.
+    //
+    // The coder type that will be present is:
+    //     WindowedValueCoder(TimerOrElementCoder(KvCoder))
+    if (keyCoder != null) {
+      try {
+        return keyCoder.decode(work.getWorkItem().getKey().newInput(), 
Coder.Context.OUTER);
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to decode key during processing", 
e);

Review Comment:
   should we wrap as CoderException instead of RuntimeException?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -759,18 +741,50 @@ public List<Windmill.WorkItemCommitRequest> 
getWorkItemCommits() {
     return commits;
   }
 
-  public Map<Long, Pair<Instant, Runnable>> getAccumulatedCallbacks() {
-    return accumulatedCallbacks;
+  // Returns list of Work that was executed in the bundle
+  public List<Work> getExecutedWorks() {
+    return executedWorks;
   }
 
+  // Returns finalization callbacks recorded during the bundle execution
+  public Map<Long, Pair<Instant, Runnable>> getFinalizationCallbacks() {
+    return finalizationCallbacks;
+  }
+
+  // Returns the current key being processed or null if an unkeyed stage.
   public @Nullable Object getKey() {
     return key;
   }
 
+  // Returns the current Work being processed.
   public Work getWork() {
     return checkStateNotNull(work);
   }
 
+  // Returns the serialized windmill key for the current Work
+  public @Nullable ByteString getSerializedKey() {
+    return work == null ? null : work.getWorkItem().getKey();
+  }
+
+  // Returns the serialized windmill key for the current Work
+  public WindmillComputationKey getComputationKey() {
+    return checkStateNotNull(computationKey);
+  }
+
+  // Returns the windmill work token for the current Work
+  public long getWorkToken() {
+    return getWorkItem().getWorkToken();
+  }
+
+  // Returns the windmill WorkItem proto for the current Work
+  public Windmill.WorkItem getWorkItem() {
+    return checkStateNotNull(

Review Comment:
   how about moving this checkSTateNotNull message to getWork() and then just 
have `return getWork().getWorkItem();`
   here



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3511,8 +3520,8 @@ public void testExceptionInvalidatesCache() throws 
Exception {
     }
 
     // Ensure that the invalidated dofn had tearDown called on them.
-    assertEquals(1, TestExceptionInvalidatesCacheFn.tearDownCallCount.get());
-    assertEquals(2, TestExceptionInvalidatesCacheFn.setupCallCount.get());
+    assertEquals(2, TestExceptionInvalidatesCacheFn.tearDownCallCount.get());

Review Comment:
   So it seems now that we are not caching this DoFn for more things that might 
happen in flushInternal which was previously separate from user code execution?
   
   For the multi-key case this seems needed because we haven't called 
finishBundle yet and thus have incomplete dofn lifecycle. However if it is the 
final key (or only key) within a batch we may be not caching dofns as 
aggressively as previously when they are still valid to use.
   
   We could defer the final finishKey if advance will return false? I'm worried 
that there might be more effects for single-key processing than we realize if 
there are cases where checkpoints do have errors since dofn construction is 
expensive.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to