This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d5e2f2961 Validate commits in StreamingDataflowWorker (#31822)
89d5e2f2961 is described below

commit 89d5e2f29615c1d4dba41c42a2161d5c5d5f39a8
Author: Andrew Crites <[email protected]>
AuthorDate: Tue Jul 30 09:17:21 2024 -0700

    Validate commits in StreamingDataflowWorker (#31822)
    
    * Grabs operational limits from streaming config and plumbs them to 
WindmillSink, which can then throw an exception or log a warning if outputs are 
too large.
---
 .../runners/dataflow/worker/OperationalLimits.java | 64 ++++++++++++++
 .../dataflow/worker/OutputTooLargeException.java   | 38 +++++++++
 .../dataflow/worker/StreamingDataflowWorker.java   | 44 ++++++----
 .../worker/StreamingModeExecutionContext.java      | 18 ++++
 .../beam/runners/dataflow/worker/WindmillSink.java | 25 ++++++
 .../worker/streaming/ComputationWorkExecutor.java  |  7 +-
 .../StreamingEngineComputationConfigFetcher.java   | 16 +++-
 .../config/StreamingEnginePipelineConfig.java      | 10 +++
 .../work/processing/StreamingWorkScheduler.java    | 22 +++--
 .../worker/StreamingDataflowWorkerTest.java        | 98 ++++++++++++++++++++--
 .../worker/StreamingModeExecutionContextTest.java  |  2 +
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  4 +-
 12 files changed, 311 insertions(+), 37 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
new file mode 100644
index 00000000000..e9ee8f39cba
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.auto.value.AutoBuilder;
+
+/** Keep track of any operational limits required by the backend. */
+public class OperationalLimits {
+  // Maximum size of a commit from a single work item.
+  public final long maxWorkItemCommitBytes;
+  // Maximum size of a single output element's serialized key.
+  public final long maxOutputKeyBytes;
+  // Maximum size of a single output element's serialized value.
+  public final long maxOutputValueBytes;
+  // Whether to throw an exception when processing output that violates any of 
the given limits.
+  public final boolean throwExceptionOnLargeOutput;
+
+  OperationalLimits(
+      long maxWorkItemCommitBytes,
+      long maxOutputKeyBytes,
+      long maxOutputValueBytes,
+      boolean throwExceptionOnLargeOutput) {
+    this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
+    this.maxOutputKeyBytes = maxOutputKeyBytes;
+    this.maxOutputValueBytes = maxOutputValueBytes;
+    this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
+  }
+
+  @AutoBuilder(ofClass = OperationalLimits.class)
+  public interface Builder {
+    Builder setMaxWorkItemCommitBytes(long bytes);
+
+    Builder setMaxOutputKeyBytes(long bytes);
+
+    Builder setMaxOutputValueBytes(long bytes);
+
+    Builder setThrowExceptionOnLargeOutput(boolean shouldThrow);
+
+    OperationalLimits build();
+  }
+
+  public static Builder builder() {
+    return new AutoBuilder_OperationalLimits_Builder()
+        .setMaxWorkItemCommitBytes(Long.MAX_VALUE)
+        .setMaxOutputKeyBytes(Long.MAX_VALUE)
+        .setMaxOutputValueBytes(Long.MAX_VALUE)
+        .setThrowExceptionOnLargeOutput(false);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java
new file mode 100644
index 00000000000..9f4b413841c
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Indicates that an output element was too large. */
+public class OutputTooLargeException extends RuntimeException {
+  public OutputTooLargeException(String reason) {
+    super(reason);
+  }
+
+  /** Returns whether an exception was caused by a {@link 
OutputTooLargeException}. */
+  public static boolean isCausedByOutputTooLargeException(@Nullable Throwable 
t) {
+    while (t != null) {
+      if (t instanceof OutputTooLargeException) {
+        return true;
+      }
+      t = t.getCause();
+    }
+    return false;
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 718d93830c4..a07bbfa7f5f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -35,7 +35,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -177,7 +177,7 @@ public class StreamingDataflowWorker {
       WorkFailureProcessor workFailureProcessor,
       StreamingCounters streamingCounters,
       MemoryMonitor memoryMonitor,
-      AtomicInteger maxWorkItemCommitBytes,
+      AtomicReference<OperationalLimits> operationalLimits,
       GrpcWindmillStreamFactory windmillStreamFactory,
       Function<String, ScheduledExecutorService> executorSupplier,
       ConcurrentMap<String, StageInfo> stageInfoMap) {
@@ -296,7 +296,7 @@ public class StreamingDataflowWorker {
             streamingCounters,
             hotKeyLogger,
             sampler,
-            maxWorkItemCommitBytes,
+            operationalLimits,
             ID_GENERATOR,
             stageInfoMap);
 
@@ -304,7 +304,6 @@ public class StreamingDataflowWorker {
     LOG.debug("WindmillServiceEndpoint: {}", 
options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
-    LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get());
   }
 
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
@@ -314,7 +313,8 @@ public class StreamingDataflowWorker {
     StreamingCounters streamingCounters = StreamingCounters.create();
     WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, 
LOG);
     BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
-    AtomicInteger maxWorkItemCommitBytes = new 
AtomicInteger(Integer.MAX_VALUE);
+    AtomicReference<OperationalLimits> operationalLimits =
+        new AtomicReference<>(OperationalLimits.builder().build());
     WindmillStateCache windmillStateCache =
         WindmillStateCache.builder()
             .setSizeMb(options.getWorkerCacheMb())
@@ -332,7 +332,7 @@ public class StreamingDataflowWorker {
             createConfigFetcherComputationStateCacheAndWindmillClient(
                 options,
                 dataflowServiceClient,
-                maxWorkItemCommitBytes,
+                operationalLimits,
                 windmillStreamFactoryBuilder,
                 configFetcher ->
                     ComputationStateCache.create(
@@ -390,7 +390,7 @@ public class StreamingDataflowWorker {
         workFailureProcessor,
         streamingCounters,
         memoryMonitor,
-        maxWorkItemCommitBytes,
+        operationalLimits,
         
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
         executorSupplier,
         stageInfo);
@@ -406,7 +406,7 @@ public class StreamingDataflowWorker {
       createConfigFetcherComputationStateCacheAndWindmillClient(
           DataflowWorkerHarnessOptions options,
           WorkUnitClient dataflowServiceClient,
-          AtomicInteger maxWorkItemCommitBytes,
+          AtomicReference<OperationalLimits> operationalLimits,
           GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
           Function<ComputationConfig.Fetcher, ComputationStateCache> 
computationStateCacheFactory) {
     ComputationConfig.Fetcher configFetcher;
@@ -422,8 +422,9 @@ public class StreamingDataflowWorker {
               config ->
                   onPipelineConfig(
                       config,
+                      options,
                       dispatcherClient::consumeWindmillDispatcherEndpoints,
-                      maxWorkItemCommitBytes));
+                      operationalLimits::set));
       computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
       windmillStreamFactory =
           windmillStreamFactoryBuilder
@@ -469,9 +470,9 @@ public class StreamingDataflowWorker {
       Supplier<Instant> clock,
       Function<String, ScheduledExecutorService> executorSupplier,
       int localRetryTimeoutMs,
-      int maxWorkItemCommitBytesOverrides) {
+      OperationalLimits limits) {
     ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
-    AtomicInteger maxWorkItemCommitBytes = new 
AtomicInteger(maxWorkItemCommitBytesOverrides);
+    AtomicReference<OperationalLimits> operationalLimits = new 
AtomicReference<>(limits);
     BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
     WindmillStateCache stateCache =
         WindmillStateCache.builder()
@@ -488,8 +489,9 @@ public class StreamingDataflowWorker {
                 config ->
                     onPipelineConfig(
                         config,
+                        options,
                         windmillServer::setWindmillServiceEndpoints,
-                        maxWorkItemCommitBytes))
+                        operationalLimits::set))
             : new 
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
     ConcurrentMap<String, String> stateNameMap =
         new ConcurrentHashMap<>(prePopulatedStateNameMappings);
@@ -557,7 +559,7 @@ public class StreamingDataflowWorker {
         workFailureProcessor,
         streamingCounters,
         memoryMonitor,
-        maxWorkItemCommitBytes,
+        operationalLimits,
         options.isEnableStreamingEngine()
             ? windmillStreamFactory
                 .setHealthCheckIntervalMillis(
@@ -570,12 +572,18 @@ public class StreamingDataflowWorker {
 
   private static void onPipelineConfig(
       StreamingEnginePipelineConfig config,
+      DataflowWorkerHarnessOptions options,
       Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
-      AtomicInteger maxWorkItemCommitBytes) {
-    if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) {
-      LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
-      maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes());
-    }
+      Consumer<OperationalLimits> operationalLimits) {
+
+    operationalLimits.accept(
+        OperationalLimits.builder()
+            .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
+            .setMaxOutputKeyBytes(config.maxOutputKeyBytes())
+            .setMaxOutputValueBytes(config.maxOutputValueBytes())
+            .setThrowExceptionOnLargeOutput(
+                DataflowRunner.hasExperiment(options, 
"throw_exceptions_on_large_output"))
+            .build());
 
     if (!config.windmillServiceEndpoints().isEmpty()) {
       
consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index dd6353060ab..a594dbb1e0f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -129,6 +129,10 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
   private Work work;
   private WindmillComputationKey computationKey;
   private SideInputStateFetcher sideInputStateFetcher;
+  // OperationalLimits is updated in start() because a 
StreamingModeExecutionContext can
+  // be used for processing many work items and these values can change during 
the context's
+  // lifetime. start() is called for each work item.
+  private OperationalLimits operationalLimits;
   private Windmill.WorkItemCommitRequest.Builder outputBuilder;
 
   /**
@@ -168,6 +172,18 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     return backlogBytes;
   }
 
+  public long getMaxOutputKeyBytes() {
+    return operationalLimits.maxOutputKeyBytes;
+  }
+
+  public long getMaxOutputValueBytes() {
+    return operationalLimits.maxOutputValueBytes;
+  }
+
+  public boolean throwExceptionsForLargeOutput() {
+    return operationalLimits.throwExceptionOnLargeOutput;
+  }
+
   public boolean workIsFailed() {
     return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
   }
@@ -177,11 +193,13 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       Work work,
       WindmillStateReader stateReader,
       SideInputStateFetcher sideInputStateFetcher,
+      OperationalLimits operationalLimits,
       Windmill.WorkItemCommitRequest.Builder outputBuilder) {
     this.key = key;
     this.work = work;
     this.computationKey = WindmillComputationKey.create(computationId, 
work.getShardedKey());
     this.sideInputStateFetcher = sideInputStateFetcher;
+    this.operationalLimits = operationalLimits;
     this.outputBuilder = outputBuilder;
     this.sideInputCache.clear();
     clearSinkFullHint();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 1f26572941a..78d0c6b4550 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -44,6 +44,8 @@ import 
org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
 import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
@@ -54,6 +56,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
   private final Coder<T> valueCoder;
   private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
   private StreamingModeExecutionContext context;
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillSink.class);
 
   WindmillSink(
       String destinationName,
@@ -172,6 +175,28 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
         key = context.getSerializedKey();
         value = encode(valueCoder, data.getValue());
       }
+      if (key.size() > context.getMaxOutputKeyBytes()) {
+        if (context.throwExceptionsForLargeOutput()) {
+          throw new OutputTooLargeException("Key too large: " + key.size());
+        } else {
+          LOG.error(
+              "Trying to output too large key with size "
+                  + key.size()
+                  + ". Limit is "
+                  + context.getMaxOutputKeyBytes());
+        }
+      }
+      if (value.size() > context.getMaxOutputValueBytes()) {
+        if (context.throwExceptionsForLargeOutput()) {
+          throw new OutputTooLargeException("Value too large: " + 
value.size());
+        } else {
+          LOG.error(
+              "Trying to output too large value with size "
+                  + value.size()
+                  + ". Limit is "
+                  + context.getMaxOutputValueBytes());
+        }
+      }
 
       Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key);
       if (keyedOutput == null) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
index dd34e85bc93..8a00194887d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
@@ -24,6 +24,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
 import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
@@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
  * @implNote Once closed, it cannot be reused.
  */
 // TODO(m-trieu): See if this can be combined/cleaned up with 
StreamingModeExecutionContext as the
-// seperation of responsibilities are unclear.
+// separation of responsibilities are unclear.
 @AutoValue
 @Internal
 @NotThreadSafe
@@ -72,9 +73,11 @@ public abstract class ComputationWorkExecutor {
       Work work,
       WindmillStateReader stateReader,
       SideInputStateFetcher sideInputStateFetcher,
+      OperationalLimits operationalLimits,
       Windmill.WorkItemCommitRequest.Builder outputBuilder)
       throws Exception {
-    context().start(key, work, stateReader, sideInputStateFetcher, 
outputBuilder);
+    context()
+        .start(key, work, stateReader, sideInputStateFetcher, 
operationalLimits, outputBuilder);
     workExecutor().execute();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
index 51d1507af5f..850e8c3f24b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
@@ -157,7 +157,7 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
     }
   }
 
-  private static StreamingEnginePipelineConfig 
createPipelineConfig(StreamingConfigTask config) {
+  private StreamingEnginePipelineConfig 
createPipelineConfig(StreamingConfigTask config) {
     StreamingEnginePipelineConfig.Builder pipelineConfig = 
StreamingEnginePipelineConfig.builder();
     if (config.getUserStepToStateFamilyNameMap() != null) {
       
pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap());
@@ -187,6 +187,18 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
       
pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
     }
 
+    if (config.getOperationalLimits() != null) {
+      if (config.getOperationalLimits().getMaxKeyBytes() > 0
+          && config.getOperationalLimits().getMaxKeyBytes() <= 
Integer.MAX_VALUE) {
+        
pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes());
+      }
+      if (config.getOperationalLimits().getMaxProductionOutputBytes() > 0
+          && config.getOperationalLimits().getMaxProductionOutputBytes() <= 
Integer.MAX_VALUE) {
+        pipelineConfig.setMaxOutputValueBytes(
+            config.getOperationalLimits().getMaxProductionOutputBytes());
+      }
+    }
+
     return pipelineConfig.build();
   }
 
@@ -273,7 +285,7 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
 
   private Optional<StreamingEnginePipelineConfig> fetchGlobalConfig() {
     return 
fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem)
-        .map(StreamingEngineComputationConfigFetcher::createPipelineConfig);
+        .map(config -> createPipelineConfig(config));
   }
 
   @FunctionalInterface
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
index b5b761ada70..8f1ff93f6a4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
@@ -34,12 +34,18 @@ public abstract class StreamingEnginePipelineConfig {
   public static StreamingEnginePipelineConfig.Builder builder() {
     return new AutoValue_StreamingEnginePipelineConfig.Builder()
         .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
+        .setMaxOutputKeyBytes(Long.MAX_VALUE)
+        .setMaxOutputValueBytes(Long.MAX_VALUE)
         .setUserStepToStateFamilyNameMap(new HashMap<>())
         .setWindmillServiceEndpoints(ImmutableSet.of());
   }
 
   public abstract long maxWorkItemCommitBytes();
 
+  public abstract long maxOutputKeyBytes();
+
+  public abstract long maxOutputValueBytes();
+
   public abstract Map<String, String> userStepToStateFamilyNameMap();
 
   public abstract ImmutableSet<HostAndPort> windmillServiceEndpoints();
@@ -48,6 +54,10 @@ public abstract class StreamingEnginePipelineConfig {
   public abstract static class Builder {
     public abstract Builder setMaxWorkItemCommitBytes(long value);
 
+    public abstract Builder setMaxOutputKeyBytes(long value);
+
+    public abstract Builder setMaxOutputValueBytes(long value);
+
     public abstract Builder setUserStepToStateFamilyNameMap(Map<String, 
String> value);
 
     public abstract Builder 
setWindmillServiceEndpoints(ImmutableSet<HostAndPort> value);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 334ab8efeae..e9ffa982925 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
@@ -31,6 +31,7 @@ import 
org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
 import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
 import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
 import org.apache.beam.runners.dataflow.worker.ReaderCache;
 import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException;
 import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
@@ -81,7 +82,7 @@ public final class StreamingWorkScheduler {
   private final HotKeyLogger hotKeyLogger;
   private final ConcurrentMap<String, StageInfo> stageInfoMap;
   private final DataflowExecutionStateSampler sampler;
-  private final AtomicInteger maxWorkItemCommitBytes;
+  private final AtomicReference<OperationalLimits> operationalLimits;
 
   public StreamingWorkScheduler(
       DataflowWorkerHarnessOptions options,
@@ -95,7 +96,7 @@ public final class StreamingWorkScheduler {
       HotKeyLogger hotKeyLogger,
       ConcurrentMap<String, StageInfo> stageInfoMap,
       DataflowExecutionStateSampler sampler,
-      AtomicInteger maxWorkItemCommitBytes) {
+      AtomicReference<OperationalLimits> operationalLimits) {
     this.options = options;
     this.clock = clock;
     this.computationWorkExecutorFactory = computationWorkExecutorFactory;
@@ -107,7 +108,7 @@ public final class StreamingWorkScheduler {
     this.hotKeyLogger = hotKeyLogger;
     this.stageInfoMap = stageInfoMap;
     this.sampler = sampler;
-    this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
+    this.operationalLimits = operationalLimits;
   }
 
   public static StreamingWorkScheduler create(
@@ -123,7 +124,7 @@ public final class StreamingWorkScheduler {
       StreamingCounters streamingCounters,
       HotKeyLogger hotKeyLogger,
       DataflowExecutionStateSampler sampler,
-      AtomicInteger maxWorkItemCommitBytes,
+      AtomicReference<OperationalLimits> operationalLimits,
       IdGenerator idGenerator,
       ConcurrentMap<String, StageInfo> stageInfoMap) {
     ComputationWorkExecutorFactory computationWorkExecutorFactory =
@@ -148,7 +149,7 @@ public final class StreamingWorkScheduler {
         hotKeyLogger,
         stageInfoMap,
         sampler,
-        maxWorkItemCommitBytes);
+        operationalLimits);
   }
 
   private static long computeShuffleBytesRead(Windmill.WorkItem workItem) {
@@ -292,7 +293,7 @@ public final class StreamingWorkScheduler {
       Windmill.WorkItemCommitRequest commitRequest,
       String computationId,
       Windmill.WorkItem workItem) {
-    int byteLimit = maxWorkItemCommitBytes.get();
+    long byteLimit = operationalLimits.get().maxWorkItemCommitBytes;
     int commitSize = commitRequest.getSerializedSize();
     int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize;
 
@@ -375,7 +376,12 @@ public final class StreamingWorkScheduler {
 
       // Blocks while executing work.
       computationWorkExecutor.executeWork(
-          executionKey, work, stateReader, localSideInputStateFetcher, 
outputBuilder);
+          executionKey,
+          work,
+          stateReader,
+          localSideInputStateFetcher,
+          operationalLimits.get(),
+          outputBuilder);
 
       if (work.isFailed()) {
         throw new WorkItemCancelledException(workItem.getShardingKey());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index 52bc61e5991..8a4369fdbd8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -549,7 +549,6 @@ public class StreamingDataflowWorkerTest {
       List<Long> inputs,
       List<Timer> timers)
       throws Exception {
-    // Windmill.GetWorkResponse.Builder builder = 
Windmill.GetWorkResponse.newBuilder();
     Windmill.WorkItem.Builder builder = Windmill.WorkItem.newBuilder();
     builder.setKey(DEFAULT_KEY_BYTES);
     builder.setShardingKey(DEFAULT_SHARDING_KEY);
@@ -849,7 +848,7 @@ public class StreamingDataflowWorkerTest {
             streamingDataflowWorkerTestParams.clock(),
             streamingDataflowWorkerTestParams.executorSupplier(),
             streamingDataflowWorkerTestParams.localRetryTimeoutMs(),
-            streamingDataflowWorkerTestParams.maxWorkItemCommitBytes());
+            streamingDataflowWorkerTestParams.operationalLimits());
     this.computationStateCache = worker.getComputationStateCache();
     return worker;
   }
@@ -1216,7 +1215,8 @@ public class StreamingDataflowWorkerTest {
         makeWorker(
             defaultWorkerParams()
                 .setInstructions(instructions)
-                .setMaxWorkItemCommitBytes(1000)
+                .setOperationalLimits(
+                    
OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build())
                 .publishCounters()
                 .build());
     worker.start();
@@ -1271,6 +1271,80 @@ public class StreamingDataflowWorkerTest {
     assertTrue(foundErrors);
   }
 
+  @Test
+  public void testOutputKeyTooLargeException() throws Exception {
+    KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(kvCoder),
+            makeDoFnInstruction(new ExceptionCatchingFn(), 0, kvCoder),
+            makeSinkInstruction(kvCoder, 1));
+
+    server.setExpectedExceptionCount(1);
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setInstructions(instructions)
+                .setOperationalLimits(
+                    OperationalLimits.builder()
+                        .setMaxOutputKeyBytes(15)
+                        .setThrowExceptionOnLargeOutput(true)
+                        .build())
+                .build());
+    worker.start();
+
+    // This large key will cause the ExceptionCatchingFn to throw an 
exception, which will then
+    // cause it to output a smaller key.
+    String bigKey = "some_much_too_large_output_key";
+    server.whenGetWorkCalled().thenReturn(makeInput(1, 0, bigKey, 
DEFAULT_SHARDING_KEY));
+    server.waitForEmptyWorkQueue();
+
+    Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
+    assertEquals(1, result.size());
+    assertEquals(
+        makeExpectedOutput(1, 0, bigKey, DEFAULT_SHARDING_KEY, 
"smaller_key").build(),
+        removeDynamicFields(result.get(1L)));
+  }
+
+  @Test
+  public void testOutputValueTooLargeException() throws Exception {
+    KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(kvCoder),
+            makeDoFnInstruction(new ExceptionCatchingFn(), 0, kvCoder),
+            makeSinkInstruction(kvCoder, 1));
+
+    server.setExpectedExceptionCount(1);
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setInstructions(instructions)
+                .setOperationalLimits(
+                    OperationalLimits.builder()
+                        .setMaxOutputValueBytes(15)
+                        .setThrowExceptionOnLargeOutput(true)
+                        .build())
+                .build());
+    worker.start();
+
+    // The first time processing will have value 
"data1_a_bunch_more_data_output", which is above
+    // the limit. After throwing the exception, the output should be just 
"data1", which is small
+    // enough.
+    server.whenGetWorkCalled().thenReturn(makeInput(1, 0, "key", 
DEFAULT_SHARDING_KEY));
+    server.waitForEmptyWorkQueue();
+
+    Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
+    assertEquals(1, result.size());
+    assertEquals(
+        makeExpectedOutput(1, 0, "key", DEFAULT_SHARDING_KEY, 
"smaller_key").build(),
+        removeDynamicFields(result.get(1L)));
+  }
+
   @Test
   public void testKeyChange() throws Exception {
     KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of());
@@ -4021,6 +4095,18 @@ public class StreamingDataflowWorkerTest {
     }
   }
 
+  static class ExceptionCatchingFn extends DoFn<KV<String, String>, KV<String, 
String>> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      try {
+        c.output(KV.of(c.element().getKey(), c.element().getValue() + 
"_a_bunch_more_data_output"));
+      } catch (Exception e) {
+        c.output(KV.of("smaller_key", c.element().getValue()));
+      }
+    }
+  }
+
   static class ChangeKeysFn extends DoFn<KV<String, String>, KV<String, 
String>> {
 
     @ProcessElement
@@ -4433,7 +4519,7 @@ public class StreamingDataflowWorkerTest {
           .setLocalRetryTimeoutMs(-1)
           .setPublishCounters(false)
           .setClock(Instant::now)
-          .setMaxWorkItemCommitBytes(Integer.MAX_VALUE);
+          .setOperationalLimits(OperationalLimits.builder().build());
     }
 
     abstract ImmutableMap<String, String> stateNameMappings();
@@ -4450,7 +4536,7 @@ public class StreamingDataflowWorkerTest {
 
     abstract int localRetryTimeoutMs();
 
-    abstract int maxWorkItemCommitBytes();
+    abstract OperationalLimits operationalLimits();
 
     @AutoValue.Builder
     abstract static class Builder {
@@ -4484,7 +4570,7 @@ public class StreamingDataflowWorkerTest {
 
       abstract Builder setLocalRetryTimeoutMs(int value);
 
-      abstract Builder setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes);
+      abstract Builder setOperationalLimits(OperationalLimits 
operationalLimits);
 
       abstract StreamingDataflowWorkerTestParams build();
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 6c46bda5acf..7988212efde 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -157,6 +157,7 @@ public class StreamingModeExecutionContextTest {
             Watermarks.builder().setInputDataWatermark(new 
Instant(1000)).build()),
         stateReader,
         sideInputStateFetcher,
+        OperationalLimits.builder().build(),
         outputBuilder);
 
     TimerInternals timerInternals = stepContext.timerInternals();
@@ -206,6 +207,7 @@ public class StreamingModeExecutionContextTest {
             Watermarks.builder().setInputDataWatermark(new 
Instant(1000)).build()),
         stateReader,
         sideInputStateFetcher,
+        OperationalLimits.builder().build(),
         outputBuilder);
     TimerInternals timerInternals = stepContext.timerInternals();
     
assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime()));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 5d8ebd53400..c79d947ca22 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -634,6 +634,7 @@ public class WorkerCustomSourcesTest {
               Watermarks.builder().setInputDataWatermark(new 
Instant(0)).build()),
           mock(WindmillStateReader.class),
           mock(SideInputStateFetcher.class),
+          OperationalLimits.builder().build(),
           Windmill.WorkItemCommitRequest.newBuilder());
 
       @SuppressWarnings({"unchecked", "rawtypes"})
@@ -1000,7 +1001,7 @@ public class WorkerCustomSourcesTest {
             Work.createProcessingContext(
                 COMPUTATION_ID,
                 (a, b) -> Windmill.KeyedGetDataResponse.getDefaultInstance(),
-                gnored -> {}),
+                ignored -> {}),
             Instant::now,
             Collections.emptyList());
     context.start(
@@ -1008,6 +1009,7 @@ public class WorkerCustomSourcesTest {
         dummyWork,
         mock(WindmillStateReader.class),
         mock(SideInputStateFetcher.class),
+        OperationalLimits.builder().build(),
         Windmill.WorkItemCommitRequest.newBuilder());
 
     @SuppressWarnings({"unchecked", "rawtypes"})


Reply via email to