scwhittle commented on code in PR #31317:
URL: https://github.com/apache/beam/pull/31317#discussion_r1613137200


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -87,6 +89,7 @@
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
+@NotThreadSafe

Review Comment:
   internal annotation
   
   Can you improve class comment as well to help clarify difference between 
Work and this?  This is reused across work instances
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -152,37 +154,22 @@ public boolean workIsFailed() {
 
   public void start(
       @Nullable Object key,
-      Windmill.WorkItem work,
-      Instant inputDataWatermark,
-      @Nullable Instant outputDataWatermark,
-      @Nullable Instant synchronizedProcessingTime,
+      Work work,
       WindmillStateReader stateReader,
       SideInputStateFetcher sideInputStateFetcher,
-      Windmill.WorkItemCommitRequest.Builder outputBuilder,
-      @Nullable Supplier<Boolean> workFailed) {
+      Windmill.WorkItemCommitRequest.Builder outputBuilder) {
     this.key = key;
-    this.work = work;
-    this.workIsFailed = (workFailed != null) ? workFailed : () -> 
Boolean.FALSE;
+    this.work = work.getWorkItem();

Review Comment:
   should we just store the work? and get rid of workIsFailed?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java:
##########
@@ -35,8 +42,13 @@ public abstract class ExecutionState {
 
   public abstract ExecutionStateTracker executionStateTracker();
 
-  public static ExecutionState.Builder builder() {
-    return new AutoValue_ExecutionState.Builder();
+  public final void close() {

Review Comment:
   If it's selectively closed probably better without autosclosable. I think I 
got a lint warning about creating something autosclosable not in a try block.
   
   keep the comment though



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java:
##########
@@ -22,10 +22,19 @@
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoValue
-public abstract class ExecutionState {
+@Internal
+public abstract class ExecutionState implements AutoCloseable {

Review Comment:
   Can you add a comment here? 
   We have a bunch of context-y things for processing floating around.  Would 
be good to note that this is per-computation and that it can be reused across 
bundles/work with resetting of things it contains.
   
   Separately I wonder if we should merge this and 
StreamingModeExecutionStateContext?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -193,31 +180,33 @@ public void start(
       for (StepContext stepContext : stepContexts) {
         stepContext.start(
             stateReader,
-            inputDataWatermark,
+            work.watermarks().inputDataWatermark(),

Review Comment:
   could pass in watermarks instead of separate params (coudl be followup)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ExecuteWorkResult.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+
+/** Value class that represents the result of executing user DoFns. */
+@AutoValue
+abstract class ExecuteWorkResult {

Review Comment:
   seems like this is just used within the single class, how about just a 
static class there instead of separate file?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -116,6 +118,11 @@ public boolean activateWork(ShardedKey shardedKey, Work 
work) {
     }
   }
 
+  public boolean activateWork(Work work) {

Review Comment:
   add a cleanup/todo comment for now?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java:
##########
@@ -97,18 +104,19 @@ public final class StreamingEngineClient {
   private StreamingEngineClient(
       JobHeader jobHeader,
       GetWorkBudget totalGetWorkBudget,
-      AtomicReference<StreamingEngineConnectionState> connections,
       GrpcWindmillStreamFactory streamFactory,
-      WorkItemProcessor workItemProcessor,
+      WorkItemScheduler workItemScheduler,
       ChannelCachingStubFactory channelCachingStubFactory,
       GetWorkBudgetDistributor getWorkBudgetDistributor,
       GrpcDispatcherClient dispatcherClient,
-      long clientId) {
+      long clientId,
+      Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory,
+      Consumer<List<Windmill.ComputationHeartbeatResponse>> 
heartbeatResponseProcessor) {
     this.jobHeader = jobHeader;
     this.started = new AtomicBoolean();

Review Comment:
   I think started could just be a boolean now that it is synchronized blocks.
   
   Remove where it is set to true other than in start method 
(startMetadataStream).
   Also consider just inlining startMetadataStream 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
+import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
+import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
+import org.apache.beam.runners.dataflow.worker.ReaderCache;
+import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
+import 
org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeException;
+import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
+import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Schedules execution of user code to process a {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem} then 
commits the work item
+ * back to streaming execution backend.
+ */
+@Internal
+@ThreadSafe
+public final class StreamingWorkScheduler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingWorkScheduler.class);
+
+  private final DataflowWorkerHarnessOptions options;
+  private final Supplier<Instant> clock;
+  private final ExecutionStateFactory executionStateFactory;
+  private final SideInputStateFetcher sideInputStateFetcher;
+  private final FailureTracker failureTracker;
+  private final WorkFailureProcessor workFailureProcessor;
+  private final StreamingCommitFinalizer commitFinalizer;
+  private final StreamingCounters streamingCounters;
+  private final HotKeyLogger hotKeyLogger;
+  private final ConcurrentMap<String, StageInfo> stageInfoMap;
+  private final DataflowExecutionStateSampler sampler;
+  private final AtomicInteger maxWorkItemCommitBytes;
+
+  public StreamingWorkScheduler(
+      DataflowWorkerHarnessOptions options,
+      Supplier<Instant> clock,
+      ExecutionStateFactory executionStateFactory,
+      SideInputStateFetcher sideInputStateFetcher,
+      FailureTracker failureTracker,
+      WorkFailureProcessor workFailureProcessor,
+      StreamingCommitFinalizer commitFinalizer,
+      StreamingCounters streamingCounters,
+      HotKeyLogger hotKeyLogger,
+      ConcurrentMap<String, StageInfo> stageInfoMap,
+      DataflowExecutionStateSampler sampler,
+      AtomicInteger maxWorkItemCommitBytes) {
+    this.options = options;
+    this.clock = clock;
+    this.executionStateFactory = executionStateFactory;
+    this.sideInputStateFetcher = sideInputStateFetcher;
+    this.failureTracker = failureTracker;
+    this.workFailureProcessor = workFailureProcessor;
+    this.commitFinalizer = commitFinalizer;
+    this.streamingCounters = streamingCounters;
+    this.hotKeyLogger = hotKeyLogger;
+    this.stageInfoMap = stageInfoMap;
+    this.sampler = sampler;
+    this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
+  }
+
+  public static StreamingWorkScheduler create(
+      DataflowWorkerHarnessOptions options,
+      Supplier<Instant> clock,
+      ReaderCache readerCache,
+      DataflowMapTaskExecutorFactory mapTaskExecutorFactory,
+      BoundedQueueExecutor workExecutor,
+      Function<String, WindmillStateCache.ForComputation> stateCacheFactory,
+      Function<Windmill.GlobalDataRequest, Windmill.GlobalData> 
fetchGlobalDataFn,
+      FailureTracker failureTracker,
+      WorkFailureProcessor workFailureProcessor,
+      StreamingCounters streamingCounters,
+      HotKeyLogger hotKeyLogger,
+      DataflowExecutionStateSampler sampler,
+      AtomicInteger maxWorkItemCommitBytes,
+      IdGenerator idGenerator,
+      ConcurrentMap<String, StageInfo> stageInfoMap) {
+    ExecutionStateFactory executionStateFactory =
+        new ExecutionStateFactory(
+            options,
+            mapTaskExecutorFactory,
+            readerCache,
+            stateCacheFactory,
+            sampler,
+            streamingCounters.pendingDeltaCounters(),
+            idGenerator);
+
+    return new StreamingWorkScheduler(
+        options,
+        clock,
+        executionStateFactory,
+        new SideInputStateFetcher(fetchGlobalDataFn, options),
+        failureTracker,
+        workFailureProcessor,
+        StreamingCommitFinalizer.create(workExecutor),
+        streamingCounters,
+        hotKeyLogger,
+        stageInfoMap,
+        sampler,
+        maxWorkItemCommitBytes);
+  }
+
+  private static long computeShuffleBytesRead(Windmill.WorkItem workItem) {
+    return workItem.getMessageBundlesList().stream()
+        .flatMap(bundle -> bundle.getMessagesList().stream())
+        .map(Windmill.Message::getSerializedSize)
+        .map(size -> (long) size)
+        .reduce(0L, Long::sum);
+  }
+
+  private static Windmill.WorkItemCommitRequest.Builder 
initializeOutputBuilder(
+      ByteString key, Windmill.WorkItem workItem) {
+    return Windmill.WorkItemCommitRequest.newBuilder()
+        .setKey(key)
+        .setShardingKey(workItem.getShardingKey())
+        .setWorkToken(workItem.getWorkToken())
+        .setCacheToken(workItem.getCacheToken());
+  }
+
+  private static Windmill.WorkItemCommitRequest buildWorkItemTruncationRequest(
+      ByteString key, Windmill.WorkItem workItem, int estimatedCommitSize) {
+    Windmill.WorkItemCommitRequest.Builder outputBuilder = 
initializeOutputBuilder(key, workItem);
+    outputBuilder.setExceedsMaxWorkItemCommitBytes(true);
+    outputBuilder.setEstimatedWorkItemCommitBytes(estimatedCommitSize);
+    return outputBuilder.build();
+  }
+
+  /** Sets the stage name and workId of the Thread executing the {@link Work} 
for logging. */
+  private static void setUpWorkLoggingContext(String workLatencyTrackingId, 
String computationId) {
+    DataflowWorkerLoggingMDC.setWorkId(workLatencyTrackingId);
+    DataflowWorkerLoggingMDC.setStageName(computationId);
+  }
+
+  private static String getShuffleTaskStepName(MapTask mapTask) {
+    // The MapTask instruction is ordered by dependencies, such that the first 
element is
+    // always going to be the shuffle task.
+    return mapTask.getInstructions().get(0).getName();
+  }
+
+  private static long computeSourceBytesProcessed(
+      DataflowWorkExecutor workExecutor, String sourceBytesCounterName) {
+    HashMap<String, ElementCounter> counters =
+        ((DataflowMapTaskExecutor) workExecutor)
+            .getReadOperation()
+            .receivers[0]
+            .getOutputCounters();
+
+    return Optional.ofNullable(counters.get(sourceBytesCounterName))
+        .map(counter -> ((OutputObjectAndByteCounter) 
counter).getByteCount().getAndReset())
+        .orElse(0L);
+  }
+
+  /** Resets logging context of the Thread executing the {@link Work} for 
logging. */
+  private void resetWorkLoggingContext(String workLatencyTrackingId) {
+    sampler.resetForWorkId(workLatencyTrackingId);
+    DataflowWorkerLoggingMDC.setWorkId(null);
+    DataflowWorkerLoggingMDC.setStageName(null);
+  }
+
+  /**
+   * Schedule work for execution. Work may be executed immediately, or queued 
and executed in the
+   * future. Only one work may be "active" (currently executing) per key at a 
time.
+   */
+  public void scheduleWork(
+      ComputationState computationState,
+      Windmill.WorkItem workItem,
+      Work.Watermarks watermarks,
+      Work.ProcessingContext.WithProcessWorkFn processingContext,
+      Collection<Windmill.LatencyAttribution> getWorkStreamLatencies) {
+    Work scheduledWork =
+        Work.create(
+            workItem,
+            watermarks,
+            processingContext.setProcessWorkFnAndBuild(work -> 
execute(computationState, work)),

Review Comment:
   instead of binding the execute fn here, how about we bind the execution to 
computationstate as Consumer<Work> when constructing it?
   
   Then I think that this method could just take Work, and work creation would 
be simpler since there wouldn't be this lazy binding.
   
   Then computationState instead of scheduling work as  runnable directly would 
schedule that consumer bound with the work.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import java.time.Duration;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class StreamingCommitFinalizer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
+  private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = 
Duration.ofMinutes(5L);
+  private final Cache<Long, Runnable> onCommitFinalizedCache;
+  private final BoundedQueueExecutor workExecutor;
+
+  private StreamingCommitFinalizer(
+      Cache<Long, Runnable> onCommitFinalizedCache, BoundedQueueExecutor 
workExecutor) {
+    this.onCommitFinalizedCache = onCommitFinalizedCache;
+    this.workExecutor = workExecutor;
+  }
+
+  static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
+    return new StreamingCommitFinalizer(
+        
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
+        workExecutor);
+  }
+
+  /**
+   * Stores a map of user worker generated id's and callbacks to execute once 
a commit has been
+   * successfully committed to the backing state store.
+   */
+  void cacheCommitFinalizers(Map<Long, Runnable> commitCallbacks) {
+    onCommitFinalizedCache.putAll(commitCallbacks);
+  }
+
+  /**
+   * Calls callbacks for WorkItem to mark that commit has been persisted 
(finalized) to the backing
+   * state store and to checkpoint the source.
+   */
+  void finalizeCommits(Windmill.WorkItem work) {

Review Comment:
   just take in an iterable of longs or change to non-batch finalizeCommit 
method taking a long.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -151,37 +154,22 @@ public boolean workIsFailed() {
 
   public void start(
       @Nullable Object key,
-      Windmill.WorkItem work,
-      Instant inputDataWatermark,
-      @Nullable Instant outputDataWatermark,
-      @Nullable Instant synchronizedProcessingTime,
+      Work work,
       WindmillStateReader stateReader,
       SideInputStateFetcher sideInputStateFetcher,
-      Windmill.WorkItemCommitRequest.Builder outputBuilder,
-      @Nullable Supplier<Boolean> workFailed) {
+      Windmill.WorkItemCommitRequest.Builder outputBuilder) {
     this.key = key;
-    this.work = work;
-    this.workIsFailed = (workFailed != null) ? workFailed : () -> 
Boolean.FALSE;
+    this.work = work.getWorkItem();
+    this.workIsFailed = work::isFailed;
     this.computationKey =
-        WindmillComputationKey.create(computationId, work.getKey(), 
work.getShardingKey());
+        WindmillComputationKey.create(
+            computationId, work.getWorkItem().getKey(), 
work.getWorkItem().getShardingKey());

Review Comment:
   nit: use work.shardedKey() and constructor taking shardedkey



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to