scwhittle commented on code in PR #31822:
URL: https://github.com/apache/beam/pull/31822#discussion_r1688858624


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -187,6 +194,18 @@ private static StreamingEnginePipelineConfig 
createPipelineConfig(StreamingConfi
       
pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
     }
 
+    if (shouldPerformOutputSizeChecks && config.getOperationalLimits() != 
null) {

Review Comment:
   seems like we could remove the shouldPerformOutputSizeChecks and just 
perform them if the limit is not the max if getOperatoinalLimits returns a 
default of max values



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -395,12 +402,21 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
         workFailureProcessor,
         streamingCounters,
         memoryMonitor,
-        maxWorkItemCommitBytes,
+        operationalLimits,
         
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
         executorSupplier,
         stageInfo);
   }
 
+  public static class OperationalLimits {
+    // Maximum size of a commit from a single work item.
+    public long maxWorkItemCommitBytes;

Review Comment:
   Could use a AutoBuilder to simplify creating the immutable object
   
   ```
   class OperationalLimits {
     final long maxWorkItemCommitBytes;
     OperationalLimits(long maxWorkItemCommitBytes, ...) { ...}
   
     @AutoBuilder(ofClass = OperationalLimits.class)
     public interface Builder {
       Builder setMaxWorkItemCommitBytes(long bytes);
       ...
       OperationalLimits build();
     }
   
     static Builder builder() {
       return 
AutoBuilder_Builder().setMaxWorkItemCommitBytes(default).set....();
     }
   }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+/** Indicates that an output element was too large. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class OutputTooLargeException extends RuntimeException {
+  public OutputTooLargeException(String reason) {
+    super(reason);
+  }
+
+  /** Returns whether an exception was caused by a {@link 
OutputTooLargeException}. */
+  public static boolean isOutputTooLargeException(Throwable t) {

Review Comment:
   how about causedByOutputTooLargeException?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -572,10 +597,32 @@ static StreamingDataflowWorker forTesting(
   private static void onPipelineConfig(
       StreamingEnginePipelineConfig config,
       Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
-      AtomicInteger maxWorkItemCommitBytes) {
-    if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) {
-      LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
-      maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes());
+      AtomicReference<OperationalLimits> operationalLimits) {

Review Comment:
   could take Consumer<OperationalLimits> and bind operationalLimits::set to 
it. This doesn't need to read it so more restrictive interface is clearer



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -81,7 +82,7 @@ public final class StreamingWorkScheduler {
   private final HotKeyLogger hotKeyLogger;
   private final ConcurrentMap<String, StageInfo> stageInfoMap;
   private final DataflowExecutionStateSampler sampler;
-  private final AtomicInteger maxWorkItemCommitBytes;
+  private final AtomicReference<StreamingDataflowWorker.OperationalLimits> 
operationalLimits;

Review Comment:
   let's move OperationalLimits to a separate file so it isn't nested class



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -395,12 +402,21 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
         workFailureProcessor,
         streamingCounters,
         memoryMonitor,
-        maxWorkItemCommitBytes,
+        operationalLimits,
         
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
         executorSupplier,
         stageInfo);
   }
 
+  public static class OperationalLimits {
+    // Maximum size of a commit from a single work item.
+    public long maxWorkItemCommitBytes;

Review Comment:
   I think that this class should be immutable (ie all final members), and then 
you make new instances of OperationLimits whenever you want to modify it.
   
   As is, there are data races where the object referred to is read in the 
atomic ref (atomically) but then examining it's members is not atomic and 
simultaneously the background thread could modify the same object.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -572,10 +597,32 @@ static StreamingDataflowWorker forTesting(
   private static void onPipelineConfig(
       StreamingEnginePipelineConfig config,
       Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
-      AtomicInteger maxWorkItemCommitBytes) {
-    if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) {
-      LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
-      maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes());
+      AtomicReference<OperationalLimits> operationalLimits) {
+    if (config.maxWorkItemCommitBytes() != 
operationalLimits.get().maxWorkItemCommitBytes) {

Review Comment:
   Create an operationalLimits from all of the set values and then install it 
with a single set
   
   A missing config value should probably revert back to the default not leave 
it as the last set value.  But if you are creating a new operationallimits each 
time I think that would be the case anwyay



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -1271,6 +1273,66 @@ public void testKeyCommitTooLargeException() throws 
Exception {
     assertTrue(foundErrors);
   }
 
+  @Test
+  public void testOutputKeyTooLargeException() throws Exception {

Review Comment:
   either add test or add to these tests that values lower than the limit go 
through successfully



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -177,11 +190,15 @@ public void start(
       Work work,
       WindmillStateReader stateReader,
       SideInputStateFetcher sideInputStateFetcher,
+      long maxOutputKeyBytes,

Review Comment:
   what about taking OperationalLimits instead of plumbing separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to