scwhittle commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1618365562


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -137,42 +151,52 @@ public StreamingModeExecutionContext(
     this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
     this.stateCache = stateCache;
     this.backlogBytes = UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN;
-    this.workIsFailed = () -> Boolean.FALSE;
   }
 
   @VisibleForTesting
-  public long getBacklogBytes() {
+  public final long getBacklogBytes() {
     return backlogBytes;
   }
 
   public boolean workIsFailed() {
-    return workIsFailed.get();
+    return Optional.ofNullable(work).map(Work::isFailed).orElse(false);

Review Comment:
   work is not marked Nullable, can the ofNullable be removed?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -285,27 +294,27 @@ private <T> SideInput<T> fetchSideInputFromWindmill(
   }
 
   public Iterable<Windmill.GlobalDataId> getSideInputNotifications() {
-    return work.getGlobalDataIdNotificationsList();
+    return work.getWorkItem().getGlobalDataIdNotificationsList();
   }
 
   private List<Timer> getFiredTimers() {
-    return work.getTimers().getTimersList();
+    return work.getWorkItem().getTimers().getTimersList();
   }
 
   public @Nullable ByteString getSerializedKey() {
-    return work == null ? null : work.getKey();
+    return work == null ? null : work.getWorkItem().getKey();

Review Comment:
   ditto, don't think work is null



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java:
##########
@@ -227,10 +270,8 @@ private void startWorkerMetadataConsumer() {
   }
 
   @VisibleForTesting
-  void finish() {
-    if (!started.compareAndSet(true, false)) {
-      return;
-    }
+  public synchronized void finish() {

Review Comment:
   might need to reduce synchronized block if any of the below shutdown methods 
block on something that might want to acquire synchronization too



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -249,4 +310,49 @@ private boolean isCommitPending() {
 
     abstract Instant startTime();
   }
+
+  @AutoValue
+  public abstract static class ProcessingContext {
+
+    private static ProcessingContext.Builder builder(
+        String computationId,
+        BiFunction<String, KeyedGetDataRequest, KeyedGetDataResponse> 
getKeyedDataFn) {
+      return new AutoValue_Work_ProcessingContext.Builder()
+          .setComputationId(computationId)
+          .setKeyedDataFetcher(
+              request -> 
Optional.ofNullable(getKeyedDataFn.apply(computationId, request)));
+    }
+
+    /** Computation that the {@link Work} belongs to. */
+    public abstract String computationId();
+
+    /**
+     * {@link WindmillStream.GetDataStream} that connects to the backend 
Windmill worker handling

Review Comment:
   update comment to just say how to handle GetData requests (not necessarily a 
stream for appliance)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java:
##########
@@ -20,38 +20,31 @@
 import java.util.Collection;
 import java.util.function.Consumer;
 import javax.annotation.CheckReturnValue;
+import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
 import org.apache.beam.sdk.annotations.Internal;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Instant;
 
 @FunctionalInterface
 @CheckReturnValue
 @Internal
-public interface WorkItemProcessor {
+public interface WorkItemScheduler {

Review Comment:
   the different packages is a little confusing to me
   
   why is this not in processing? 
   what is the difference between dataflow.worker.streaming and 
dataflow.worker.windmill.work etc?
   



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java:
##########
@@ -975,18 +997,26 @@ public void testFailedWorkItemsAbort() throws Exception {
             .setSourceState(
                 Windmill.SourceState.newBuilder().setState(state).build()) // 
Source state.
             .build();
-    Work dummyWork = Work.create(workItem, Instant::now, 
Collections.emptyList(), unused -> {});
-
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))
+        .thenReturn(Windmill.KeyedGetDataResponse.getDefaultInstance());
+    Work dummyWork =
+        Work.create(
+            workItem,
+            Watermarks.builder().setInputDataWatermark(new Instant(0)).build(),
+            Work.createProcessingContext(COMPUTATION_ID, 
getDataStream::requestKeyedData)

Review Comment:
   instead of mocking getdata and commit just fake out with lambdas
               Work.createProcessingContext(
                       COMPUTATION_ID, (a, b) -> 
Windmill.KeyedGetDataResponse.getDefaultInstance())
                   .setWorkCommitter(ignored -> {})



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java:
##########
@@ -52,17 +59,25 @@ public class StreamingApplianceWorkCommitterTest {
   private FakeWindmillServer fakeWindmillServer;
   private StreamingApplianceWorkCommitter workCommitter;
 
-  private static Work createMockWork(long workToken, Consumer<Work> 
processWorkFn) {
+  private static Work createMockWork(long workToken) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))
+        .thenReturn(Windmill.KeyedGetDataResponse.getDefaultInstance());

Review Comment:
   use lambda for getdata



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java:
##########
@@ -80,43 +87,46 @@ private static FailureTracker 
streamingApplianceFailureReporter(boolean isWorkFa
         ignored -> 
Windmill.ReportStatsResponse.newBuilder().setFailed(isWorkFailed).build());
   }
 
-  private static Work createWork(Supplier<Instant> clock, Consumer<Work> 
processWorkFn) {
-    return Work.create(
-        Windmill.WorkItem.newBuilder()
-            .setKey(ByteString.EMPTY)
-            .setWorkToken(1L)
-            .setCacheToken(1L)
-            .setShardingKey(1L)
-            .build(),
-        clock,
-        new ArrayList<>(),
+  private static ExecutableWork createWork(Supplier<Instant> clock, 
Consumer<Work> processWorkFn) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import java.time.Duration;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StreamingCommitFinalizer {

Review Comment:
   annotate threadsafe and internal



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -307,15 +315,38 @@ public void cleanUp() {
         .ifPresent(ComputationStateCache::closeAndInvalidateAll);
   }
 
-  static Work createMockWork(long workToken) {
-    return createMockWork(workToken, work -> {});
+  private static ExecutableWork createMockWork(
+      ShardedKey shardedKey, long workToken, String computationId) {
+    return createMockWork(shardedKey, workToken, computationId, ignored -> {});
   }
 
-  static Work createMockWork(long workToken, Consumer<Work> processWorkFn) {
-    return Work.create(
-        
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(),
-        Instant::now,
-        Collections.emptyList(),
+  private static ExecutableWork createMockWork(
+      ShardedKey shardedKey, long workToken, Consumer<Work> processWorkFn) {
+    return createMockWork(shardedKey, workToken, "computationId", 
processWorkFn);
+  }
+
+  private static ExecutableWork createMockWork(
+      ShardedKey shardedKey, long workToken, String computationId, 
Consumer<Work> processWorkFn) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))
+        .thenReturn(Windmill.KeyedGetDataResponse.getDefaultInstance());
+    SideInputStateFetcher sideInputStateFetcher = 
mock(SideInputStateFetcher.class);

Review Comment:
   unused?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -307,15 +315,38 @@ public void cleanUp() {
         .ifPresent(ComputationStateCache::closeAndInvalidateAll);
   }
 
-  static Work createMockWork(long workToken) {
-    return createMockWork(workToken, work -> {});
+  private static ExecutableWork createMockWork(
+      ShardedKey shardedKey, long workToken, String computationId) {
+    return createMockWork(shardedKey, workToken, computationId, ignored -> {});
   }
 
-  static Work createMockWork(long workToken, Consumer<Work> processWorkFn) {
-    return Work.create(
-        
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(),
-        Instant::now,
-        Collections.emptyList(),
+  private static ExecutableWork createMockWork(
+      ShardedKey shardedKey, long workToken, Consumer<Work> processWorkFn) {
+    return createMockWork(shardedKey, workToken, "computationId", 
processWorkFn);
+  }
+
+  private static ExecutableWork createMockWork(
+      ShardedKey shardedKey, long workToken, String computationId, 
Consumer<Work> processWorkFn) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);

Review Comment:
   use lambdas



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import java.time.Duration;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StreamingCommitFinalizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
+  private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = 
Duration.ofMinutes(5L);
+  private final Cache<Long, Runnable> onCommitFinalizedCache;
+  private final BoundedQueueExecutor workExecutor;
+
+  private StreamingCommitFinalizer(
+      Cache<Long, Runnable> onCommitFinalizedCache, BoundedQueueExecutor 
workExecutor) {
+    this.onCommitFinalizedCache = onCommitFinalizedCache;
+    this.workExecutor = workExecutor;
+  }
+
+  static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
+    return new StreamingCommitFinalizer(
+        
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
+        workExecutor);
+  }
+
+  /**
+   * Stores a map of user worker generated id's and callbacks to execute once 
a commit has been
+   * successfully committed to the backing state store.
+   */
+  void cacheCommitFinalizers(Map<Long, Runnable> commitCallbacks) {
+    onCommitFinalizedCache.putAll(commitCallbacks);
+  }
+
+  /**
+   * Calls callbacks for WorkItem to mark that commit has been persisted 
(finalized) to the backing

Review Comment:
   I think the comment could be clearer how about something like
   
   When this method is called, the commits associated with the provided 
finalizeIds have been successfully persisted in the backing state store. If the 
commitCallback for the finalizationId is still cached it is invoked.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import java.time.Duration;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StreamingCommitFinalizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
+  private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = 
Duration.ofMinutes(5L);
+  private final Cache<Long, Runnable> onCommitFinalizedCache;
+  private final BoundedQueueExecutor workExecutor;
+
+  private StreamingCommitFinalizer(
+      Cache<Long, Runnable> onCommitFinalizedCache, BoundedQueueExecutor 
workExecutor) {
+    this.onCommitFinalizedCache = onCommitFinalizedCache;
+    this.workExecutor = workExecutor;
+  }
+
+  static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
+    return new StreamingCommitFinalizer(
+        
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
+        workExecutor);
+  }
+
+  /**
+   * Stores a map of user worker generated id's and callbacks to execute once 
a commit has been

Review Comment:
   how about being more specific "user worker generated finalization ids"
   since we have lots of ids floating around :)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Optional;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
+import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
+import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to process {@link Work} by executing user DoFns for a specific 
computation. May be reused to
+ * process future work items owned a computation.
+ *
+ * <p>Should only be accessed by 1 thread at a time.
+ *
+ * @implNote Once closed, it cannot be reused.
+ */
+// TODO(m-trieu): See if this can be combined/cleaned up with 
StreamingModeExecutionContext as the
+// seperation of responsibilities are unclear.
+@AutoValue
+@Internal
+@NotThreadSafe
+public abstract class ComputationWorkExecutor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputationWorkExecutor.class);
+
+  public static ComputationWorkExecutor.Builder builder() {
+    return new AutoValue_ComputationWorkExecutor.Builder();
+  }
+
+  public abstract DataflowWorkExecutor workExecutor();
+
+  public abstract StreamingModeExecutionContext context();
+
+  public abstract Optional<Coder<?>> keyCoder();
+
+  public abstract ExecutionStateTracker executionStateTracker();
+
+  /**
+   * Executes DoFns for the Work. Blocks the calling thread until DoFn(s) have 
completed execution.
+   */
+  public final void executeWork(
+      @Nullable Object key,
+      Work work,
+      WindmillStateReader stateReader,
+      SideInputStateFetcher sideInputStateFetcher,
+      Windmill.WorkItemCommitRequest.Builder outputBuilder)
+      throws Exception {
+    context().start(key, work, stateReader, sideInputStateFetcher, 
outputBuilder);
+    workExecutor().execute();
+  }
+
+  /**
+   * Callers should only invoke invalidate() when execution of work fails. 
Once closed, the instance
+   * cannot be reused.
+   */
+  public final void invalidate() {
+    context().invalidateCache();
+    closeWorkExecutor();
+  }
+
+  public final void closeWorkExecutor() {

Review Comment:
   comment? when should this be called? is it possible to reuse after this 
method?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java:
##########
@@ -53,38 +60,69 @@ public class ActiveWorkStateTest {
   private final WindmillStateCache.ForComputation computationStateCache =
       mock(WindmillStateCache.ForComputation.class);
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
-  private Map<ShardedKey, Deque<Work>> readOnlyActiveWork;
+  private Map<ShardedKey, Deque<ExecutableWork>> readOnlyActiveWork;
 
   private ActiveWorkState activeWorkState;
 
   private static ShardedKey shardedKey(String str, long shardKey) {
     return ShardedKey.create(ByteString.copyFromUtf8(str), shardKey);
   }
 
-  private static Work createWork(Windmill.WorkItem workItem) {
-    return Work.create(workItem, Instant::now, Collections.emptyList(), unused 
-> {});
+  private static ExecutableWork createWork(Windmill.WorkItem workItem) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))
+        .thenReturn(Windmill.KeyedGetDataResponse.getDefaultInstance());

Review Comment:
   workCommitter and  getDataStream are unused after created



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -249,4 +310,49 @@ private boolean isCommitPending() {
 
     abstract Instant startTime();
   }
+
+  @AutoValue
+  public abstract static class ProcessingContext {
+
+    private static ProcessingContext.Builder builder(
+        String computationId,
+        BiFunction<String, KeyedGetDataRequest, KeyedGetDataResponse> 
getKeyedDataFn) {
+      return new AutoValue_Work_ProcessingContext.Builder()
+          .setComputationId(computationId)
+          .setKeyedDataFetcher(
+              request -> 
Optional.ofNullable(getKeyedDataFn.apply(computationId, request)));
+    }
+
+    /** Computation that the {@link Work} belongs to. */
+    public abstract String computationId();
+
+    /**
+     * {@link WindmillStream.GetDataStream} that connects to the backend 
Windmill worker handling
+     * the {@link WorkItem}.
+     */
+    public abstract Function<KeyedGetDataRequest, 
Optional<KeyedGetDataResponse>>
+        keyedDataFetcher();
+
+    /**
+     * {@link WorkCommitter} that commits completed work to the backend 
Windmill worker handling the
+     * {@link WorkItem}.
+     */
+    public abstract Consumer<Commit> workCommitter();
+
+    public abstract Optional<WindmillStream.GetDataStream> getDataStream();

Review Comment:
   add comment
   
   is this just exposed so we can do heartbeat batching since we have 
keyedDataFetcher otherwise?  If so perhaps it would be better to change to some 
more restricted interface in a follow up PR



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java:
##########
@@ -53,38 +60,69 @@ public class ActiveWorkStateTest {
   private final WindmillStateCache.ForComputation computationStateCache =
       mock(WindmillStateCache.ForComputation.class);
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
-  private Map<ShardedKey, Deque<Work>> readOnlyActiveWork;
+  private Map<ShardedKey, Deque<ExecutableWork>> readOnlyActiveWork;
 
   private ActiveWorkState activeWorkState;
 
   private static ShardedKey shardedKey(String str, long shardKey) {
     return ShardedKey.create(ByteString.copyFromUtf8(str), shardKey);
   }
 
-  private static Work createWork(Windmill.WorkItem workItem) {
-    return Work.create(workItem, Instant::now, Collections.emptyList(), unused 
-> {});
+  private static ExecutableWork createWork(Windmill.WorkItem workItem) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))
+        .thenReturn(Windmill.KeyedGetDataResponse.getDefaultInstance());
+    return ExecutableWork.create(
+        Work.create(
+            workItem,
+            Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
+            createWorkProcessingContext(),
+            Instant::now,
+            Collections.emptyList()),
+        ignored -> {});
   }
 
-  private static Work expiredWork(Windmill.WorkItem workItem) {
-    return Work.create(workItem, () -> Instant.EPOCH, Collections.emptyList(), 
unused -> {});
+  private static ExecutableWork expiredWork(Windmill.WorkItem workItem) {
+    return ExecutableWork.create(
+        Work.create(
+            workItem,
+            Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
+            createWorkProcessingContext(),
+            () -> Instant.EPOCH,
+            Collections.emptyList()),
+        ignored -> {});
+  }
+
+  private static Work.ProcessingContext createWorkProcessingContext() {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))
+        .thenReturn(Windmill.KeyedGetDataResponse.getDefaultInstance());
+    return Work.createProcessingContext("computationId", 
getDataStream::requestKeyedData)
+        .setWorkCommitter(workCommitter::commit)

Review Comment:
   same comments as other test, just fake functions directly?
   
               Work.createProcessingContext(
                       "computationId", (a, b) -> 
Windmill.KeyedGetDataResponse.getDefaultInstance())
                   .setWorkCommitter(ignored -> {})



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java:
##########
@@ -52,17 +59,25 @@ public class StreamingApplianceWorkCommitterTest {
   private FakeWindmillServer fakeWindmillServer;
   private StreamingApplianceWorkCommitter workCommitter;
 
-  private static Work createMockWork(long workToken, Consumer<Work> 
processWorkFn) {
+  private static Work createMockWork(long workToken) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));

Review Comment:
   we don't expect any commit calls via this since we're committing directly 
below.
   maybe a lambda that throws exception?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java:
##########
@@ -80,43 +87,46 @@ private static FailureTracker 
streamingApplianceFailureReporter(boolean isWorkFa
         ignored -> 
Windmill.ReportStatsResponse.newBuilder().setFailed(isWorkFailed).build());
   }
 
-  private static Work createWork(Supplier<Instant> clock, Consumer<Work> 
processWorkFn) {
-    return Work.create(
-        Windmill.WorkItem.newBuilder()
-            .setKey(ByteString.EMPTY)
-            .setWorkToken(1L)
-            .setCacheToken(1L)
-            .setShardingKey(1L)
-            .build(),
-        clock,
-        new ArrayList<>(),
+  private static ExecutableWork createWork(Supplier<Instant> clock, 
Consumer<Work> processWorkFn) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))
+        .thenReturn(Windmill.KeyedGetDataResponse.getDefaultInstance());
+    return ExecutableWork.create(
+        Work.create(
+            
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(1L).build(),
+            Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
+            Work.createProcessingContext("computationId", 
getDataStream::requestKeyedData)
+                .setWorkCommitter(workCommitter::commit)
+                .build(),
+            clock,
+            new ArrayList<>()),
         processWorkFn);
   }
 
-  private static Work createWork() {
-    return createWork(Instant::now, ignored -> {});
-  }
-
-  private static Work createWork(Consumer<Work> processWorkFn) {
+  private static ExecutableWork createWork(Consumer<Work> processWorkFn) {
     return createWork(Instant::now, processWorkFn);
   }
 
   @Test
   public void logAndProcessFailure_doesNotRetryKeyTokenInvalidException() {
-    Work work = spy(createWork());
+    Set<Work> executeWork = new HashSet<>();

Review Comment:
   nit: executedWork here and below



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java:
##########
@@ -69,17 +76,25 @@ public class StreamingEngineWorkCommitterTest {
   private FakeWindmillServer fakeWindmillServer;
   private Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory;
 
-  private static Work createMockWork(long workToken, Consumer<Work> 
processWorkFn) {
+  private static Work createMockWork(long workToken) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);

Review Comment:
   ditto, validate this commit is not called since calling directly below. 
lambda for getdata



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3376,16 +3416,20 @@ public void testActiveWorkFailure() throws Exception {
   @Test
   public void testLatencyAttributionProtobufsPopulated() {
     FakeClock clock = new FakeClock();
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java:
##########
@@ -106,16 +114,27 @@ private ActiveWorkRefresher createActiveWorkRefresher(
         Executors.newSingleThreadScheduledExecutor());
   }
 
-  private Work createOldWork(int workIds, Consumer<Work> processWork) {
-    return Work.create(
-        Windmill.WorkItem.newBuilder()
-            .setWorkToken(workIds)
-            .setCacheToken(workIds)
-            .setKey(ByteString.EMPTY)
-            .setShardingKey(workIds)
-            .build(),
-        DispatchedActiveWorkRefresherTest.A_LONG_TIME_AGO,
-        ImmutableList.of(),
+  private ExecutableWork createOldWork(
+      ShardedKey shardedKey, int workIds, Consumer<Work> processWork) {
+    WorkCommitter workCommitter = mock(WorkCommitter.class);
+    doNothing().when(workCommitter).commit(any(Commit.class));
+    WindmillStream.GetDataStream getDataStream = 
mock(WindmillStream.GetDataStream.class);
+    when(getDataStream.requestKeyedData(anyString(), any()))
+        .thenReturn(Windmill.KeyedGetDataResponse.getDefaultInstance());

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import java.time.Duration;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StreamingCommitFinalizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
+  private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = 
Duration.ofMinutes(5L);
+  private final Cache<Long, Runnable> onCommitFinalizedCache;
+  private final BoundedQueueExecutor workExecutor;

Review Comment:
   nit: rename throughout to just executor or finalizationExecutor
   
   since we're not dealing with work here it's a little confusing to call 
workExecutor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to