scwhittle commented on code in PR #30764:
URL: https://github.com/apache/beam/pull/30764#discussion_r1574449155


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java:
##########
@@ -18,30 +18,41 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import com.google.api.client.util.Clock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.runners.dataflow.util.TimeUtil;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@NotThreadSafe
+@Internal
 public class HotKeyLogger {
   private final Logger LOG = LoggerFactory.getLogger(HotKeyLogger.class);
 
   /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
-  private Clock clock = Clock.SYSTEM;
-
+  private final Clock clock;
+  /** Throttles logging the detection to every loggingPeriod */
+  private final Duration loggingPeriod = Duration.standardMinutes(5);
   /**
    * The previous time the HotKeyDetection was logged. This is used to 
throttle logging to every 5
    * minutes.
    */
   private long prevHotKeyDetectionLogMs = 0;
 
-  /** Throttles logging the detection to every loggingPeriod */
-  private final Duration loggingPeriod = Duration.standardMinutes(5);
+  private HotKeyLogger(Clock clock) {
+    this.clock = clock;
+  }
 
-  HotKeyLogger() {}
+  public static HotKeyLogger ofSystemClock() {

Review Comment:
   just name create?



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsLogger.java:
##########
@@ -33,10 +33,18 @@ public class MetricsLogger extends MetricsContainerImpl {
   AtomicLong lastReportedMillis = new AtomicLong(System.currentTimeMillis());
   @Nullable MetricsContainerImpl lastMetricsSnapshot = null;
 
-  public MetricsLogger(@Nullable String stepName) {
+  private MetricsLogger(@Nullable String stepName) {
     super(stepName);
   }
 
+  public static MetricsLogger createUnboundedMetricsLogger() {

Review Comment:
   unbounded is confusing with bounded/unbounded pcollection.
   How about workerMetricsLogger if it is metrics not scoped to a step? 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingMDC.java:
##########
@@ -17,15 +17,19 @@
  */
 package org.apache.beam.runners.dataflow.worker.logging;
 
+import org.checkerframework.checker.nullness.qual.Nullable;
+
 /** Mapped diagnostic context for the Dataflow worker. */
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   can this be removed with your changes?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -74,8 +79,22 @@ public ImmutableMap<String, String> 
getTransformUserNameToStateFamily() {
     return transformUserNameToStateFamily;
   }
 
-  public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
-    return executionStateQueue;
+  /**
+   * Adds the {@link ExecutionState} to the internal {@link 
#executionStateQueue} so that it can be

Review Comment:
   nit: I wouldn't reference internal variable, just note that it adds to a 
cache and may be returned on subsequent acquireExecutionState calls.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -128,21 +152,26 @@ private void forceExecute(Work work) {
     executor.forceExecute(work, work.getWorkItem().getSerializedSize());
   }
 
-  /** Gets HeartbeatRequests for any work started before refreshDeadline. */
-  public ImmutableList<HeartbeatRequest> getKeyHeartbeats(
-      Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
-    return activeWorkState.getKeyHeartbeats(refreshDeadline, sampler);
+  public ImmutableMap<ShardedKey, Deque<Work>> currentActiveWorkReadOnly() {
+    return activeWorkState.getReadOnlyActiveWork();
   }
 
   public void printActiveWork(PrintWriter writer) {
     activeWorkState.printActiveWork(writer, Instant.now());
   }
 
-  @Override
-  public void close() throws Exception {
+  public GetWorkBudget getActiveWorkBudget() {
+    return activeWorkState.currentActiveWorkBudget();
+  }
+
+  public final void close() {
     @Nullable ExecutionState executionState;
     while ((executionState = executionStateQueue.poll()) != null) {
-      executionState.workExecutor().close();
+      try {
+        executionState.workExecutor().close();

Review Comment:
   seems better to call executionState.close() instead since it is added



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -74,8 +79,22 @@ public ImmutableMap<String, String> 
getTransformUserNameToStateFamily() {
     return transformUserNameToStateFamily;
   }
 
-  public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
-    return executionStateQueue;
+  /**
+   * Adds the {@link ExecutionState} to the internal {@link 
#executionStateQueue} so that it can be
+   * re-used in future processing.
+   */
+  public void releaseExecutionState(ExecutionState executionState) {
+    executionStateQueue.offer(executionState);
+  }
+
+  /**
+   * Removes an {@link ExecutionState} instance from {@link 
#executionStateQueue} if one exists, and

Review Comment:
   ditto, instead of referring to internal impl just note that it returns null 
or a state that was previously offered wtih releaseExecutionState



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+    extends CacheLoader<String, Optional<ComputationState>> {
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+  private final StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader;
+  private final BoundedQueueExecutor workUnitExecutor;
+  private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+  private final Function<String, WindmillStateCache.ForComputation>
+      perComputationStateCacheViewFactory;
+
+  public StreamingApplianceComputationStateCacheLoader(
+      StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader,
+      BoundedQueueExecutor workUnitExecutor,
+      Function<String, WindmillStateCache.ForComputation> 
perComputationStateCacheViewFactory) {
+    this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+    this.workUnitExecutor = workUnitExecutor;
+    this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+    this.perComputationStateCacheViewFactory = 
perComputationStateCacheViewFactory;
+  }
+
+  private static Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId(
+      Windmill.GetConfigResponse response) {
+    // Outer keys are computation ids. Outer values are map from transform 
username to state
+    // family.
+    Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId =
+        new HashMap<>();
+    for (Windmill.GetConfigResponse.ComputationConfigMapEntry 
computationConfig :
+        response.getComputationConfigMapList()) {
+      Map<String, String> transformUserNameToStateFamily =
+          transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+              computationConfig.getComputationId(), k -> new HashMap<>());
+      for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry 
entry :
+          
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
 {
+        transformUserNameToStateFamily.put(entry.getTransformUserName(), 
entry.getStateFamily());
+      }
+    }
+
+    return transformUserNameToStateFamilyByComputationId;
+  }
+
+  /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+  private static Optional<MapTask> deserializeAndFixMapTask(String 
serializedMapTask) {
+    try {
+      return Optional.of(
+          StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+              .apply(Transport.getJsonFactory().fromString(serializedMapTask, 
MapTask.class)));
+    } catch (IOException e) {
+      LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<ComputationState> load(String computationId) throws 
Exception {
+    return Preconditions.checkNotNull(
+        loadAll(ImmutableList.of(computationId)).getOrDefault(computationId, 
Optional.empty()));
+  }
+
+  @Override
+  public Map<String, Optional<ComputationState>> loadAll(
+      Iterable<? extends String> computationIds) {
+    Optional<Windmill.GetConfigResponse> maybeResponse =
+        streamingApplianceConfigLoader.getComputationConfig(
+            Iterables.getOnlyElement(computationIds));

Review Comment:
   can there be a separate assert for single element?
   Or what about moving the implementation to load and changing loadAll to 
iterate over ids and call load for each one?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ApplianceWorkerHarness.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
+import org.apache.beam.runners.dataflow.worker.DataflowWorkUnitClient;
+import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
+import org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory;
+import 
org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
+import org.apache.beam.runners.dataflow.worker.ReaderCache;
+import org.apache.beam.runners.dataflow.worker.WindmillComputationKey;
+import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
+import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import 
org.apache.beam.runners.dataflow.worker.streaming.StreamingApplianceComputationStateCacheLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.processing.ExecutionStateFactory;
+import 
org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingCommitFinalizer;
+import 
org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingWorkScheduler;
+import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationWorkItems;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.WorkProcessingContext;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingApplianceFailureTracker;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.DispatchedActiveWorkRefresher;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.FileSystems;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Streaming Appliance worker harness implementation. */
+@Internal
+@ThreadSafe
+public final class ApplianceWorkerHarness implements StreamingWorkerHarness {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ApplianceWorkerHarness.class);
+  private static final String MEMORY_MONITOR_EXECUTOR = "MemoryMonitor";
+  private static final String GET_WORK_EXECUTOR = "GetWorkDispatcher";
+  private static final String REFRESH_WORK_EXECUTOR = "RefreshWork";
+
+  private final DataflowWorkerHarnessOptions options;
+  private final long clientId;
+  private final WindmillServerStub windmillServer;
+  private final MetricTrackingWindmillServerStub getDataClient;
+  private final MemoryMonitor memoryMonitor;
+  private final ComputationStateCache computationCache;
+  private final StreamingWorkScheduler streamingWorkScheduler;
+  private final WorkCommitter workCommitter;
+  private final StreamingWorkerStatusReporter workerStatusReporter;
+  private final StreamingStatusPages statusPages;
+  private final DataflowExecutionStateSampler sampler;
+  private final ActiveWorkRefresher activeWorkRefresher;
+  private final BoundedQueueExecutor workExecutor;
+  private final ExecutorService memoryMonitoryExecutor;
+  private final ExecutorService getWorkExecutor;
+  private final AtomicBoolean isRunning;
+
+  private ApplianceWorkerHarness(
+      DataflowWorkerHarnessOptions options,
+      long clientId,
+      WindmillServerStub windmillServer,
+      MetricTrackingWindmillServerStub getDataClient,
+      MemoryMonitor memoryMonitor,
+      ComputationStateCache computationCache,
+      StreamingWorkScheduler streamingWorkScheduler,
+      WorkCommitter workCommitter,
+      StreamingWorkerStatusReporter workerStatusReporter,
+      StreamingStatusPages statusPages,
+      DataflowExecutionStateSampler sampler,
+      ActiveWorkRefresher activeWorkRefresher,
+      ExecutorService memoryMonitoryExecutor,
+      ExecutorService getWorkExecutor,
+      AtomicBoolean isRunning,
+      BoundedQueueExecutor workExecutor) {
+    this.options = options;
+    this.clientId = clientId;
+    this.windmillServer = windmillServer;
+    this.getDataClient = getDataClient;
+    this.memoryMonitor = memoryMonitor;
+    this.computationCache = computationCache;
+    this.streamingWorkScheduler = streamingWorkScheduler;
+    this.workCommitter = workCommitter;
+    this.workerStatusReporter = workerStatusReporter;
+    this.statusPages = statusPages;
+    this.sampler = sampler;
+    this.activeWorkRefresher = activeWorkRefresher;
+    this.memoryMonitoryExecutor = memoryMonitoryExecutor;
+    this.getWorkExecutor = getWorkExecutor;
+    this.isRunning = isRunning;
+    this.workExecutor = workExecutor;
+    // Register standard file systems.
+    FileSystems.setDefaultPipelineOptions(options);
+  }
+
+  public static ApplianceWorkerHarness 
fromOptions(DataflowWorkerHarnessOptions options) {

Review Comment:
   At a high-level the difference between appliance/dispatcher/direct paths 
seem to be limited to:
   - how to get the config
   - how to get/schedule WorkProcessingContext (which then knows how to 
fetch/commit/heartbeat to right worker).
   
   I think that moving things out of StreamingWorkerHarness into classes can 
help testing/readability but that there is still a lot of duplicated setup 
between the different harnesses which will make it harder to maintain.
   
   
   
   Can we instead have a shared StreamingDataflowWorker that uses the objects 
and which just minimal injection differences are made when constructing for 
appliance/fanout/direct?
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -74,8 +79,22 @@ public ImmutableMap<String, String> 
getTransformUserNameToStateFamily() {
     return transformUserNameToStateFamily;
   }
 
-  public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
-    return executionStateQueue;
+  /**
+   * Adds the {@link ExecutionState} to the internal {@link 
#executionStateQueue} so that it can be
+   * re-used in future processing.
+   */
+  public void releaseExecutionState(ExecutionState executionState) {
+    executionStateQueue.offer(executionState);

Review Comment:
   should we reset() the execution state? or add a comment that it should be 
reset before adding here?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader

Review Comment:
   perhaps the computation cache and cache loaders might be a good thing to 
separate out from this PR so it isn't as monolithic?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -90,11 +83,11 @@ private ActiveWorkState(
   }
 
   static ActiveWorkState create(WindmillStateCache.ForComputation 
computationStateCache) {
-    return new ActiveWorkState(new HashMap<>(), computationStateCache);
+    return new ActiveWorkState(new ConcurrentHashMap<>(), 
computationStateCache);

Review Comment:
   why is the concurrent hash map required? It seems getReadOnlyActivework is 
synchronized below



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -321,11 +291,8 @@ private synchronized ImmutableMap<ShardedKey, WorkId> 
getStuckCommitsAt(
     return stuckCommits.build();
   }
 
-  synchronized ImmutableList<HeartbeatRequest> getKeyHeartbeats(
-      Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
-    return activeWork.entrySet().stream()
-        .flatMap(entry -> toHeartbeatRequestStream(entry, refreshDeadline, 
sampler))
-        .collect(toImmutableList());
+  synchronized ImmutableMap<ShardedKey, Deque<Work>> getReadOnlyActiveWork() {
+    return ImmutableMap.copyOf(activeWork);

Review Comment:
   this is just a shallow copy of deque so it seems like there will be 
concurrency issues.
   Instead of having to copy deeper, you could instead have a visitor pattern 
where you accept some function to run over the keys and queued work with 
appropriate synchronization



##########
runners/google-cloud-dataflow-java/worker/build.gradle:
##########
@@ -257,3 +257,4 @@ checkstyleMain.enabled = false
 checkstyleTest.enabled = false
 //TODO(https://github.com/apache/beam/issues/19119): javadoc task should be 
enabled in the future.
 javadoc.enabled = false
+test.outputs.upToDateWhen {false}

Review Comment:
   remove?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCache.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Cache of {@link String} computationId to {@link ComputationState} objects. 
*/
+@Internal
+@ThreadSafe
+public final class ComputationStateCache implements StatusDataProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputationStateCache.class);
+
+  private final LoadingCache<String, Optional<ComputationState>> 
computationCache;
+
+  private ComputationStateCache(LoadingCache<String, 
Optional<ComputationState>> computationCache) {
+    this.computationCache = computationCache;
+  }
+
+  public static ComputationStateCache create(
+      CacheLoader<String, Optional<ComputationState>> cacheLoader) {
+    return new 
ComputationStateCache(CacheBuilder.newBuilder().build(cacheLoader));
+  }
+
+  /**
+   * Returns the {@link ComputationState} associated with the given 
computationId. May perform IO if
+   * a value is not present, and it is possible that after IO is performed 
there is no value
+   * correlated with that computationId.
+   */
+  public Optional<ComputationState> getComputationState(String computationId) {
+    try {
+      Optional<ComputationState> computationState =
+          
computationCache.getAll(ImmutableList.of(computationId)).get(computationId);
+      return computationState != null ? computationState : Optional.empty();

Review Comment:
   I don't think null is possible for loading cache values



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+    extends CacheLoader<String, Optional<ComputationState>> {
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+  private final StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader;
+  private final BoundedQueueExecutor workUnitExecutor;
+  private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+  private final Function<String, WindmillStateCache.ForComputation>
+      perComputationStateCacheViewFactory;
+
+  public StreamingApplianceComputationStateCacheLoader(
+      StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader,
+      BoundedQueueExecutor workUnitExecutor,
+      Function<String, WindmillStateCache.ForComputation> 
perComputationStateCacheViewFactory) {
+    this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+    this.workUnitExecutor = workUnitExecutor;
+    this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+    this.perComputationStateCacheViewFactory = 
perComputationStateCacheViewFactory;
+  }
+
+  private static Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId(
+      Windmill.GetConfigResponse response) {
+    // Outer keys are computation ids. Outer values are map from transform 
username to state
+    // family.
+    Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId =
+        new HashMap<>();
+    for (Windmill.GetConfigResponse.ComputationConfigMapEntry 
computationConfig :
+        response.getComputationConfigMapList()) {
+      Map<String, String> transformUserNameToStateFamily =
+          transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+              computationConfig.getComputationId(), k -> new HashMap<>());
+      for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry 
entry :
+          
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
 {
+        transformUserNameToStateFamily.put(entry.getTransformUserName(), 
entry.getStateFamily());
+      }
+    }
+
+    return transformUserNameToStateFamilyByComputationId;
+  }
+
+  /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+  private static Optional<MapTask> deserializeAndFixMapTask(String 
serializedMapTask) {
+    try {
+      return Optional.of(
+          StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+              .apply(Transport.getJsonFactory().fromString(serializedMapTask, 
MapTask.class)));
+    } catch (IOException e) {
+      LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<ComputationState> load(String computationId) throws 
Exception {
+    return Preconditions.checkNotNull(
+        loadAll(ImmutableList.of(computationId)).getOrDefault(computationId, 
Optional.empty()));
+  }
+
+  @Override
+  public Map<String, Optional<ComputationState>> loadAll(
+      Iterable<? extends String> computationIds) {
+    Optional<Windmill.GetConfigResponse> maybeResponse =
+        streamingApplianceConfigLoader.getComputationConfig(
+            Iterables.getOnlyElement(computationIds));
+    if (!maybeResponse.isPresent()) {
+      return ImmutableMap.of();
+    }
+
+    Windmill.GetConfigResponse response = maybeResponse.get();
+
+    // The max work item commit bytes should be modified to be dynamic once it 
is available in

Review Comment:
   odd spot for this comment, the global id has this field but not individual 
computations.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+    extends CacheLoader<String, Optional<ComputationState>> {
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+  private final StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader;
+  private final BoundedQueueExecutor workUnitExecutor;
+  private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+  private final Function<String, WindmillStateCache.ForComputation>
+      perComputationStateCacheViewFactory;
+
+  public StreamingApplianceComputationStateCacheLoader(
+      StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader,
+      BoundedQueueExecutor workUnitExecutor,
+      Function<String, WindmillStateCache.ForComputation> 
perComputationStateCacheViewFactory) {
+    this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+    this.workUnitExecutor = workUnitExecutor;
+    this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+    this.perComputationStateCacheViewFactory = 
perComputationStateCacheViewFactory;
+  }
+
+  private static Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId(
+      Windmill.GetConfigResponse response) {
+    // Outer keys are computation ids. Outer values are map from transform 
username to state

Review Comment:
   nit: The inner map maps from ... 
   seems like it woudl read better than "Outer values are map"
   
   also since this is returned woudl be better to move out of method



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java:
##########
@@ -51,4 +54,13 @@ public abstract static class Builder {
 
     public abstract ExecutionState build();
   }
+
+  public final void close() {
+    try {
+      context().invalidateCache();
+      workExecutor().close();
+    } catch (Exception e) {
+      LOG.warn("Failed to close map task executor: ", e);

Review Comment:
   either update log to indicate it may be invalidatin ghte cache as well or 
remove invalidating hte cache from the try block



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+    extends CacheLoader<String, Optional<ComputationState>> {
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+  private final StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader;
+  private final BoundedQueueExecutor workUnitExecutor;
+  private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+  private final Function<String, WindmillStateCache.ForComputation>
+      perComputationStateCacheViewFactory;
+
+  public StreamingApplianceComputationStateCacheLoader(
+      StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader,
+      BoundedQueueExecutor workUnitExecutor,
+      Function<String, WindmillStateCache.ForComputation> 
perComputationStateCacheViewFactory) {
+    this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+    this.workUnitExecutor = workUnitExecutor;
+    this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+    this.perComputationStateCacheViewFactory = 
perComputationStateCacheViewFactory;
+  }
+
+  private static Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId(
+      Windmill.GetConfigResponse response) {
+    // Outer keys are computation ids. Outer values are map from transform 
username to state
+    // family.
+    Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId =
+        new HashMap<>();
+    for (Windmill.GetConfigResponse.ComputationConfigMapEntry 
computationConfig :
+        response.getComputationConfigMapList()) {
+      Map<String, String> transformUserNameToStateFamily =
+          transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+              computationConfig.getComputationId(), k -> new HashMap<>());
+      for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry 
entry :
+          
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
 {
+        transformUserNameToStateFamily.put(entry.getTransformUserName(), 
entry.getStateFamily());
+      }
+    }
+
+    return transformUserNameToStateFamilyByComputationId;
+  }
+
+  /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+  private static Optional<MapTask> deserializeAndFixMapTask(String 
serializedMapTask) {
+    try {
+      return Optional.of(
+          StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+              .apply(Transport.getJsonFactory().fromString(serializedMapTask, 
MapTask.class)));
+    } catch (IOException e) {
+      LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<ComputationState> load(String computationId) throws 
Exception {
+    return Preconditions.checkNotNull(
+        loadAll(ImmutableList.of(computationId)).getOrDefault(computationId, 
Optional.empty()));
+  }
+
+  @Override
+  public Map<String, Optional<ComputationState>> loadAll(
+      Iterable<? extends String> computationIds) {
+    Optional<Windmill.GetConfigResponse> maybeResponse =
+        streamingApplianceConfigLoader.getComputationConfig(
+            Iterables.getOnlyElement(computationIds));
+    if (!maybeResponse.isPresent()) {
+      return ImmutableMap.of();
+    }
+
+    Windmill.GetConfigResponse response = maybeResponse.get();
+
+    // The max work item commit bytes should be modified to be dynamic once it 
is available in
+    // the request.
+    for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry entry :
+        response.getSystemNameToComputationIdMapList()) {
+      systemNameToComputationIdMap.put(entry.getSystemName(), 
entry.getComputationId());

Review Comment:
   can you get rid of this? 
   It seems we just request a single item, so should we just get a single 
response back? Can we then just pass the computationId to 
createComputationSTate instead of trying to map back from the response?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkProcessingContext.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work;
+
+import com.google.auto.value.AutoValue;
+import com.google.auto.value.extension.memoized.Memoized;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/** Context to process {@link WorkItem} */
+@AutoValue
+@Internal
+public abstract class WorkProcessingContext {
+  public static WorkProcessingContext.Builder builder() {
+    return new AutoValue_WorkProcessingContext.Builder();
+  }
+
+  public static WorkProcessingContext.Builder builder(
+      String computationId,
+      BiFunction<String, KeyedGetDataRequest, Optional<KeyedGetDataResponse>> 
keyedDataFetcher) {
+    return builder()
+        .setComputationId(computationId)
+        .setKeyedDataFetcher(request -> keyedDataFetcher.apply(computationId, 
request));
+  }
+
+  public static WorkProcessingContext.Builder builder(
+      String computationId, GetDataStream getDataStream) {
+    return builder()
+        .setComputationId(computationId)
+        .setGetDataStream(getDataStream)
+        .setKeyedDataFetcher(
+            request -> 
Optional.ofNullable(getDataStream.requestKeyedData(computationId, request)));
+  }
+
+  public abstract String computationId();
+
+  public abstract Instant inputDataWatermark();
+
+  public abstract @Nullable Instant synchronizedProcessingTime();
+
+  public abstract @Nullable Instant outputDataWatermark();
+
+  public abstract @Nullable GetDataStream getDataStream();
+
+  /** {@link WorkItem} being processed. */
+  public abstract WorkItem workItem();
+
+  /**
+   * {@link GetDataStream} that connects to the backend Windmill worker 
handling the {@link
+   * WorkItem}.
+   */
+  public abstract Function<KeyedGetDataRequest, 
Optional<KeyedGetDataResponse>> keyedDataFetcher();
+
+  /**
+   * {@link WorkCommitter} that commits completed work to the backend Windmill 
worker handling the
+   * {@link WorkItem}.
+   */
+  public abstract Consumer<Commit> workCommitter();
+
+  public final void queueCommit(Commit commit) {
+    workCommitter().accept(commit);
+  }
+
+  @Memoized
+  public ShardedKey shardedKey() {
+    return ShardedKey.create(workItem().getKey(), workItem().getShardingKey());
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setComputationId(String value);
+
+    public abstract Builder setInputDataWatermark(Instant value);
+
+    public abstract Builder setSynchronizedProcessingTime(@Nullable Instant 
value);
+
+    public abstract Builder setOutputDataWatermark(@Nullable Instant value);
+
+    public abstract Builder setWorkItem(WorkItem value);
+
+    public abstract Builder setKeyedDataFetcher(
+        Function<KeyedGetDataRequest, Optional<KeyedGetDataResponse>> value);
+
+    public abstract Builder setWorkCommitter(Consumer<Commit> value);
+
+    public abstract Builder setGetDataStream(GetDataStream value);
+
+    abstract WorkProcessingContext autoBuild();

Review Comment:
   private?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCache.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Cache of {@link String} computationId to {@link ComputationState} objects. 
*/
+@Internal
+@ThreadSafe
+public final class ComputationStateCache implements StatusDataProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ComputationStateCache.class);
+
+  private final LoadingCache<String, Optional<ComputationState>> 
computationCache;
+
+  private ComputationStateCache(LoadingCache<String, 
Optional<ComputationState>> computationCache) {
+    this.computationCache = computationCache;
+  }
+
+  public static ComputationStateCache create(
+      CacheLoader<String, Optional<ComputationState>> cacheLoader) {
+    return new 
ComputationStateCache(CacheBuilder.newBuilder().build(cacheLoader));
+  }
+
+  /**
+   * Returns the {@link ComputationState} associated with the given 
computationId. May perform IO if
+   * a value is not present, and it is possible that after IO is performed 
there is no value
+   * correlated with that computationId.
+   */
+  public Optional<ComputationState> getComputationState(String computationId) {
+    try {
+      Optional<ComputationState> computationState =
+          
computationCache.getAll(ImmutableList.of(computationId)).get(computationId);

Review Comment:
   use get() instead of getAll



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+    extends CacheLoader<String, Optional<ComputationState>> {
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+  private final StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader;
+  private final BoundedQueueExecutor workUnitExecutor;
+  private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+  private final Function<String, WindmillStateCache.ForComputation>
+      perComputationStateCacheViewFactory;
+
+  public StreamingApplianceComputationStateCacheLoader(
+      StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader,
+      BoundedQueueExecutor workUnitExecutor,
+      Function<String, WindmillStateCache.ForComputation> 
perComputationStateCacheViewFactory) {
+    this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+    this.workUnitExecutor = workUnitExecutor;
+    this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+    this.perComputationStateCacheViewFactory = 
perComputationStateCacheViewFactory;
+  }
+
+  private static Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId(
+      Windmill.GetConfigResponse response) {
+    // Outer keys are computation ids. Outer values are map from transform 
username to state
+    // family.
+    Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId =
+        new HashMap<>();
+    for (Windmill.GetConfigResponse.ComputationConfigMapEntry 
computationConfig :
+        response.getComputationConfigMapList()) {
+      Map<String, String> transformUserNameToStateFamily =
+          transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+              computationConfig.getComputationId(), k -> new HashMap<>());
+      for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry 
entry :
+          
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
 {
+        transformUserNameToStateFamily.put(entry.getTransformUserName(), 
entry.getStateFamily());
+      }
+    }
+
+    return transformUserNameToStateFamilyByComputationId;
+  }
+
+  /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+  private static Optional<MapTask> deserializeAndFixMapTask(String 
serializedMapTask) {
+    try {
+      return Optional.of(
+          StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+              .apply(Transport.getJsonFactory().fromString(serializedMapTask, 
MapTask.class)));
+    } catch (IOException e) {
+      LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+    }
+    return Optional.empty();

Review Comment:
   would it be better to surface an exception so that the loading cache 
retries? how does this compare to the existing code? Caching an optional empty 
seems like it is going to make processing this computation on the worker never 
succeed.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+    extends CacheLoader<String, Optional<ComputationState>> {
+  private static final Logger LOG =
+      
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+  private final StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader;
+  private final BoundedQueueExecutor workUnitExecutor;
+  private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+  private final Function<String, WindmillStateCache.ForComputation>
+      perComputationStateCacheViewFactory;
+
+  public StreamingApplianceComputationStateCacheLoader(
+      StreamingConfigLoader<Windmill.GetConfigResponse> 
streamingApplianceConfigLoader,
+      BoundedQueueExecutor workUnitExecutor,
+      Function<String, WindmillStateCache.ForComputation> 
perComputationStateCacheViewFactory) {
+    this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+    this.workUnitExecutor = workUnitExecutor;
+    this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+    this.perComputationStateCacheViewFactory = 
perComputationStateCacheViewFactory;
+  }
+
+  private static Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId(
+      Windmill.GetConfigResponse response) {
+    // Outer keys are computation ids. Outer values are map from transform 
username to state
+    // family.
+    Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId =
+        new HashMap<>();
+    for (Windmill.GetConfigResponse.ComputationConfigMapEntry 
computationConfig :
+        response.getComputationConfigMapList()) {
+      Map<String, String> transformUserNameToStateFamily =
+          transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+              computationConfig.getComputationId(), k -> new HashMap<>());
+      for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry 
entry :
+          
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
 {
+        transformUserNameToStateFamily.put(entry.getTransformUserName(), 
entry.getStateFamily());
+      }
+    }
+
+    return transformUserNameToStateFamilyByComputationId;
+  }
+
+  /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+  private static Optional<MapTask> deserializeAndFixMapTask(String 
serializedMapTask) {
+    try {
+      return Optional.of(
+          StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+              .apply(Transport.getJsonFactory().fromString(serializedMapTask, 
MapTask.class)));
+    } catch (IOException e) {
+      LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<ComputationState> load(String computationId) throws 
Exception {
+    return Preconditions.checkNotNull(
+        loadAll(ImmutableList.of(computationId)).getOrDefault(computationId, 
Optional.empty()));
+  }
+
+  @Override
+  public Map<String, Optional<ComputationState>> loadAll(
+      Iterable<? extends String> computationIds) {
+    Optional<Windmill.GetConfigResponse> maybeResponse =
+        streamingApplianceConfigLoader.getComputationConfig(
+            Iterables.getOnlyElement(computationIds));
+    if (!maybeResponse.isPresent()) {
+      return ImmutableMap.of();
+    }
+
+    Windmill.GetConfigResponse response = maybeResponse.get();
+
+    // The max work item commit bytes should be modified to be dynamic once it 
is available in
+    // the request.
+    for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry entry :
+        response.getSystemNameToComputationIdMapList()) {
+      systemNameToComputationIdMap.put(entry.getSystemName(), 
entry.getComputationId());
+    }
+
+    Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId =
+        transformUserNameToStateFamilyByComputationId(response);
+
+    return response.getCloudWorksList().stream()
+        .map(
+            serializedMapTask ->
+                createComputationState(
+                    serializedMapTask, 
transformUserNameToStateFamilyByComputationId))
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .collect(toImmutableMap(ComputationState::getComputationId, 
Optional::of));
+  }
+
+  private Optional<ComputationState> createComputationState(
+      String serializedMapTask,
+      Map<String, Map<String, String>> 
transformUserNameToStateFamilyByComputationId) {
+    return deserializeAndFixMapTask(serializedMapTask)
+        .map(
+            mapTask -> {
+              String computationId =
+                  systemNameToComputationIdMap.getOrDefault(
+                      mapTask.getSystemName(), mapTask.getSystemName());
+              return new ComputationState(
+                  computationId,
+                  mapTask,
+                  workUnitExecutor,
+                  transformUserNameToStateFamilyByComputationId.getOrDefault(
+                      computationId, ImmutableMap.of()),
+                  perComputationStateCacheViewFactory.apply(computationId));

Review Comment:
   it seems like the appliance and SE differ just in how they get the map task 
and username->sf map. 
   
   What if we had an interface instead of 
StreamingConfigLoader<Windmill.GetConfigResponse> that can vend the MapTask and 
usertransform to statefamilymap for a computation?
   
   Then the ComputationStateCacheLoader could just be a final class taking that 
loader.  The StreamingEngineConfigLoader could implement that interface as well 
as the other one for the dynamic config and the appliance could just implement 
that.
   
   I'm not sure the StreamingConfigLoader templated interface is buying us much 
since only the SE needs the background threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to