scwhittle commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1620206256


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -137,42 +151,52 @@ public StreamingModeExecutionContext(
     this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
     this.stateCache = stateCache;
     this.backlogBytes = UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN;
-    this.workIsFailed = () -> Boolean.FALSE;
   }
 
   @VisibleForTesting
-  public long getBacklogBytes() {
+  public final long getBacklogBytes() {
     return backlogBytes;
   }
 
   public boolean workIsFailed() {
-    return workIsFailed.get();
+    return Optional.ofNullable(work).map(Work::isFailed).orElse(false);

Review Comment:
   It seems like work should be marked Nullable since it's not set to non-null 
in constructor



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -249,4 +308,42 @@ private boolean isCommitPending() {
 
     abstract Instant startTime();
   }
+
+  @AutoValue
+  public abstract static class ProcessingContext {
+
+    private static ProcessingContext.Builder builder(
+        String computationId,
+        BiFunction<String, KeyedGetDataRequest, KeyedGetDataResponse> 
getKeyedDataFn) {
+      return new AutoValue_Work_ProcessingContext.Builder()
+          .setComputationId(computationId)
+          .setKeyedDataFetcher(
+              request -> 
Optional.ofNullable(getKeyedDataFn.apply(computationId, request)));
+    }
+
+    /** Computation that the {@link Work} belongs to. */
+    public abstract String computationId();
+
+    /** Handles GetData requests to streaming backend. */
+    public abstract Function<KeyedGetDataRequest, 
Optional<KeyedGetDataResponse>>
+        keyedDataFetcher();
+
+    /**
+     * {@link WorkCommitter} that commits completed work to the backend 
Windmill worker handling the
+     * {@link WorkItem}.
+     */
+    public abstract Consumer<Commit> workCommitter();

Review Comment:
   what happens if the work committer is not set on the builder before building?
   Do we end up with a null ptr even though this isn't nullable? Or does build 
fail?
   
   Should we just get rid of the builder and just have a create method setting 
all the fields? From couple spots I coudl see we always set the commit at same 
spot not lazily.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -285,27 +294,27 @@ private <T> SideInput<T> fetchSideInputFromWindmill(
   }
 
   public Iterable<Windmill.GlobalDataId> getSideInputNotifications() {
-    return work.getGlobalDataIdNotificationsList();
+    return work.getWorkItem().getGlobalDataIdNotificationsList();
   }
 
   private List<Timer> getFiredTimers() {
-    return work.getTimers().getTimersList();
+    return work.getWorkItem().getTimers().getTimersList();
   }
 
   public @Nullable ByteString getSerializedKey() {
-    return work == null ? null : work.getKey();
+    return work == null ? null : work.getWorkItem().getKey();

Review Comment:
   nullness checking was disabled so this should check for work null (which is 
still here just noting to keep it)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -83,19 +87,30 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** {@link DataflowExecutionContext} for use in streaming mode. */
+/**
+ * {@link DataflowExecutionContext} for use in streaming mode. Contains cached 
readers and Beam
+ * state pertaining to a processing its owning computation. Can be reused 
across processing
+ * different WorkItems for the same computation.
+ */
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   maybe this can be removed to catch the missing work Nullable and perhaps 
other things?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to