scwhittle commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1620206256
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -137,42 +151,52 @@ public StreamingModeExecutionContext(
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN;
- this.workIsFailed = () -> Boolean.FALSE;
}
@VisibleForTesting
- public long getBacklogBytes() {
+ public final long getBacklogBytes() {
return backlogBytes;
}
public boolean workIsFailed() {
- return workIsFailed.get();
+ return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
Review Comment:
It seems like work should be marked Nullable since it's not set to non-null
in constructor
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -249,4 +308,42 @@ private boolean isCommitPending() {
abstract Instant startTime();
}
+
+ @AutoValue
+ public abstract static class ProcessingContext {
+
+ private static ProcessingContext.Builder builder(
+ String computationId,
+ BiFunction<String, KeyedGetDataRequest, KeyedGetDataResponse>
getKeyedDataFn) {
+ return new AutoValue_Work_ProcessingContext.Builder()
+ .setComputationId(computationId)
+ .setKeyedDataFetcher(
+ request ->
Optional.ofNullable(getKeyedDataFn.apply(computationId, request)));
+ }
+
+ /** Computation that the {@link Work} belongs to. */
+ public abstract String computationId();
+
+ /** Handles GetData requests to streaming backend. */
+ public abstract Function<KeyedGetDataRequest,
Optional<KeyedGetDataResponse>>
+ keyedDataFetcher();
+
+ /**
+ * {@link WorkCommitter} that commits completed work to the backend
Windmill worker handling the
+ * {@link WorkItem}.
+ */
+ public abstract Consumer<Commit> workCommitter();
Review Comment:
what happens if the work committer is not set on the builder before building?
Do we end up with a null ptr even though this isn't nullable? Or does build
fail?
Should we just get rid of the builder and just have a create method setting
all the fields? From couple spots I coudl see we always set the commit at same
spot not lazily.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -285,27 +294,27 @@ private <T> SideInput<T> fetchSideInputFromWindmill(
}
public Iterable<Windmill.GlobalDataId> getSideInputNotifications() {
- return work.getGlobalDataIdNotificationsList();
+ return work.getWorkItem().getGlobalDataIdNotificationsList();
}
private List<Timer> getFiredTimers() {
- return work.getTimers().getTimersList();
+ return work.getWorkItem().getTimers().getTimersList();
}
public @Nullable ByteString getSerializedKey() {
- return work == null ? null : work.getKey();
+ return work == null ? null : work.getWorkItem().getKey();
Review Comment:
nullness checking was disabled so this should check for work null (which is
still here just noting to keep it)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -83,19 +87,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** {@link DataflowExecutionContext} for use in streaming mode. */
+/**
+ * {@link DataflowExecutionContext} for use in streaming mode. Contains cached
readers and Beam
+ * state pertaining to a processing its owning computation. Can be reused
across processing
+ * different WorkItems for the same computation.
+ */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Review Comment:
maybe this can be removed to catch the missing work Nullable and perhaps
other things?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]