scwhittle commented on code in PR #30764:
URL: https://github.com/apache/beam/pull/30764#discussion_r1551205078


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java:
##########
@@ -67,7 +67,7 @@ public DataflowWorkProgressUpdater(
     super(worker, Integer.MAX_VALUE);
     this.workItemStatusClient = workItemStatusClient;
     this.workItem = workItem;
-    this.hotKeyLogger = new HotKeyLogger();
+    this.hotKeyLogger = HotKeyLogger.ofSystemClock();

Review Comment:
   Can cleanups be moved to separate PRs? Less churn if things are reverted and 
easier to review and summarize with commit description.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -64,7 +64,7 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-class DataflowWorkUnitClient implements WorkUnitClient {
+public class DataflowWorkUnitClient implements WorkUnitClient {

Review Comment:
   mark these classes you are making public as Internal



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java:
##########
@@ -18,30 +18,38 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import com.google.api.client.util.Clock;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.runners.dataflow.util.TimeUtil;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@NotThreadSafe

Review Comment:
   mark Internal



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java:
##########
@@ -39,7 +39,7 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-class ReaderCache {
+public class ReaderCache {

Review Comment:
   internal



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -599,8 +601,10 @@ public static void main(String[] args) throws Exception {
         StreamingDataflowWorker.class.getSimpleName());
 
     LOG.debug("Creating StreamingDataflowWorker from options: {}", options);
-    StreamingDataflowWorker worker = 
StreamingDataflowWorker.fromOptions(options);
-
+    StreamingWorkerHarness worker =
+        isDirectPathPipeline(options)
+            ? StreamingEngineDirectPathWorkerHarness.fromOptions(options)
+            : StreamingDataflowWorker.fromOptions(options);

Review Comment:
   how about moving the harness portions of StreamingDataflowWorker to a new 
StreamingSingleEndpointWorkerHarness class? (open to better name ideas).
   
   I think that is clearer which parts of this file are non-direct path 
specific and which are shared.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java:
##########
@@ -124,11 +128,40 @@ private static Stream<HeartbeatRequest> 
toHeartbeatRequestStream(
                     .setWorkToken(work.getWorkItem().getWorkToken())
                     .setCacheToken(work.getWorkItem().getCacheToken())
                     .addAllLatencyAttribution(
-                        work.getLatencyAttributions(
-                            /* isHeartbeat= */ true, 
work.getLatencyTrackingId(), sampler))
+                        work.getLatencyAttributions(/* isHeartbeat= */ true, 
sampler))
                     .build());
   }
 
+  private static Stream<DirectHeartbeatRequest> 
toHeartbeatRequestStreamDirectPath(
+      Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue,
+      Instant refreshDeadline,
+      DataflowExecutionStateSampler sampler) {
+    ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
+    Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();
+
+    return workQueue.stream()
+        .filter(work -> work.getStartTime().isBefore(refreshDeadline))
+        .peek(
+            work -> {
+              if (work.getProcessWorkItemClient().getDataStream().isClosed()) {
+                work.setFailed();
+              }
+            })
+        // Don't send heartbeats for queued work we already know is failed.
+        .filter(work -> !work.isFailed())
+        .map(
+            work ->
+                DirectHeartbeatRequest.create(

Review Comment:
   it seems like it would be more convenient for the caller to get back a 
map<DataStream, heartbeats>, and then we could avoid the other 
DirectHeartbeatRequest class.
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceConfigLoader.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public final class StreamingApplianceConfigLoader
+    implements StreamingConfigLoader<GetConfigResponse> {
+
+  private final WindmillServerStub windmillServer;
+  private final Consumer<GetConfigResponse> onConfigResponse;
+
+  public StreamingApplianceConfigLoader(
+      WindmillServerStub windmillServer, Consumer<GetConfigResponse> 
onConfigResponse) {
+    this.windmillServer = windmillServer;
+    this.onConfigResponse = onConfigResponse;
+  }
+
+  @Override
+  public void start() {
+    // no-op. Does not perform any asynchronous processing internally.
+  }
+
+  @Override
+  public Optional<GetConfigResponse> getComputationConfig(String 
computationId) {
+    GetConfigResponse getConfigResponse =

Review Comment:
   verify computationid is non-empty



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java:
##########
@@ -124,11 +128,40 @@ private static Stream<HeartbeatRequest> 
toHeartbeatRequestStream(
                     .setWorkToken(work.getWorkItem().getWorkToken())
                     .setCacheToken(work.getWorkItem().getCacheToken())
                     .addAllLatencyAttribution(
-                        work.getLatencyAttributions(
-                            /* isHeartbeat= */ true, 
work.getLatencyTrackingId(), sampler))
+                        work.getLatencyAttributions(/* isHeartbeat= */ true, 
sampler))
                     .build());
   }
 
+  private static Stream<DirectHeartbeatRequest> 
toHeartbeatRequestStreamDirectPath(

Review Comment:
   It would be nice to have a single helper used by both direct/non-direct 
methods since they are largely the same and could otherwise drift.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java:
##########
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.streaming;
+package org.apache.beam.runners.dataflow.worker.streaming.computations;

Review Comment:
   computations is a confusing package name to me.  We are grouping by 
computation but the objects are more to do with processing.
   
   However instead of renaming the package how about just leaving these in 
streaming?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineDirectPathWorkerHarness.java:
##########
@@ -0,0 +1,1131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.auto.value.AutoValue;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.beam.runners.dataflow.internal.CustomSources;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext;
+import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
+import org.apache.beam.runners.dataflow.worker.DataflowWorkUnitClient;
+import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
+import org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory;
+import org.apache.beam.runners.dataflow.worker.ReaderCache;
+import org.apache.beam.runners.dataflow.worker.ReaderRegistry;
+import org.apache.beam.runners.dataflow.worker.SinkRegistry;
+import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import org.apache.beam.runners.dataflow.worker.WindmillComputationKey;
+import org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem;
+import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.runners.dataflow.worker.graph.Edges;
+import org.apache.beam.runners.dataflow.worker.graph.Networks;
+import org.apache.beam.runners.dataflow.worker.graph.Nodes;
+import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
+import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
+import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
+import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
+import 
org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeException;
+import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
+import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
+import 
org.apache.beam.runners.dataflow.worker.streaming.computations.ComputationState;
+import 
org.apache.beam.runners.dataflow.worker.streaming.computations.ComputationStateCache;
+import 
org.apache.beam.runners.dataflow.worker.streaming.computations.StreamingEngineComputationStateCacheLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.StreamingEngineClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import org.apache.beam.runners.dataflow.worker.windmill.state.GetDataClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.ProcessWorkItemClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingEngineFailureTracker;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.DirectActiveWorkRefresher;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.graph.MutableNetwork;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public final class StreamingEngineDirectPathWorkerHarness implements 
StreamingWorkerHarness {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StreamingEngineDirectPathWorkerHarness.class);
+  // Controls processing parallelism. Maximum number of threads for 
processing.  Currently, each
+  // thread processes one key at a time.
+  private static final int MAX_PROCESSING_THREADS = 300;

Review Comment:
   There is too much duplication between this and the other harness, which will 
make it difficult to add new features (such as current PR to make processing 
threads dynamic).
   
   It seems like a lot of the things: executor, metrics, reporting, cache etc 
are not affected by how work is obtained, committed or state fetched.  It would 
be better if we could instead keep the logic and just inject different work 
obtainer, committer, state fetcher.
   
   Or alternatively we could make everything work with direct-path by always 
plumbing somethign to use for getdata/commitwork and in the non-direct path 
cases just having a single one. 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ComputationState.java:
##########
@@ -74,8 +83,12 @@ public ImmutableMap<String, String> 
getTransformUserNameToStateFamily() {
     return transformUserNameToStateFamily;
   }
 
-  public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
-    return executionStateQueue;
+  public void releaseExecutionState(ExecutionState executionState) {
+    executionStateQueue.offer(executionState);
+  }
+
+  public Optional<ExecutionState> getExecutionState() {

Review Comment:
   name acquireExecutionState or pollExecutionState?
   get makes it sound like a simple accessor



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -64,27 +75,92 @@ private Work(Windmill.WorkItem workItem, Supplier<Instant> 
clock, Consumer<Work>
             .setCacheToken(workItem.getCacheToken())
             .setWorkToken(workItem.getWorkToken())
             .build();
+    this.latencyTrackingId = buildLatencyTrackingId(workItem);

Review Comment:
   rm method and just inline?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.computations;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader

Review Comment:
   Let's put the config loading changes in a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to