This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 89d5e2f2961 Validate commits in StreamingDataflowWorker (#31822)
89d5e2f2961 is described below
commit 89d5e2f29615c1d4dba41c42a2161d5c5d5f39a8
Author: Andrew Crites <[email protected]>
AuthorDate: Tue Jul 30 09:17:21 2024 -0700
Validate commits in StreamingDataflowWorker (#31822)
* Grabs operational limits from streaming config and plumbs them to
WindmillSink, which can then throw an exception or log a warning if outputs are
too large.
---
.../runners/dataflow/worker/OperationalLimits.java | 64 ++++++++++++++
.../dataflow/worker/OutputTooLargeException.java | 38 +++++++++
.../dataflow/worker/StreamingDataflowWorker.java | 44 ++++++----
.../worker/StreamingModeExecutionContext.java | 18 ++++
.../beam/runners/dataflow/worker/WindmillSink.java | 25 ++++++
.../worker/streaming/ComputationWorkExecutor.java | 7 +-
.../StreamingEngineComputationConfigFetcher.java | 16 +++-
.../config/StreamingEnginePipelineConfig.java | 10 +++
.../work/processing/StreamingWorkScheduler.java | 22 +++--
.../worker/StreamingDataflowWorkerTest.java | 98 ++++++++++++++++++++--
.../worker/StreamingModeExecutionContextTest.java | 2 +
.../dataflow/worker/WorkerCustomSourcesTest.java | 4 +-
12 files changed, 311 insertions(+), 37 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
new file mode 100644
index 00000000000..e9ee8f39cba
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.auto.value.AutoBuilder;
+
+/** Keep track of any operational limits required by the backend. */
+public class OperationalLimits {
+ // Maximum size of a commit from a single work item.
+ public final long maxWorkItemCommitBytes;
+ // Maximum size of a single output element's serialized key.
+ public final long maxOutputKeyBytes;
+ // Maximum size of a single output element's serialized value.
+ public final long maxOutputValueBytes;
+ // Whether to throw an exception when processing output that violates any of
the given limits.
+ public final boolean throwExceptionOnLargeOutput;
+
+ OperationalLimits(
+ long maxWorkItemCommitBytes,
+ long maxOutputKeyBytes,
+ long maxOutputValueBytes,
+ boolean throwExceptionOnLargeOutput) {
+ this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
+ this.maxOutputKeyBytes = maxOutputKeyBytes;
+ this.maxOutputValueBytes = maxOutputValueBytes;
+ this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
+ }
+
+ @AutoBuilder(ofClass = OperationalLimits.class)
+ public interface Builder {
+ Builder setMaxWorkItemCommitBytes(long bytes);
+
+ Builder setMaxOutputKeyBytes(long bytes);
+
+ Builder setMaxOutputValueBytes(long bytes);
+
+ Builder setThrowExceptionOnLargeOutput(boolean shouldThrow);
+
+ OperationalLimits build();
+ }
+
+ public static Builder builder() {
+ return new AutoBuilder_OperationalLimits_Builder()
+ .setMaxWorkItemCommitBytes(Long.MAX_VALUE)
+ .setMaxOutputKeyBytes(Long.MAX_VALUE)
+ .setMaxOutputValueBytes(Long.MAX_VALUE)
+ .setThrowExceptionOnLargeOutput(false);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java
new file mode 100644
index 00000000000..9f4b413841c
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Indicates that an output element was too large. */
+public class OutputTooLargeException extends RuntimeException {
+ public OutputTooLargeException(String reason) {
+ super(reason);
+ }
+
+ /** Returns whether an exception was caused by a {@link
OutputTooLargeException}. */
+ public static boolean isCausedByOutputTooLargeException(@Nullable Throwable
t) {
+ while (t != null) {
+ if (t instanceof OutputTooLargeException) {
+ return true;
+ }
+ t = t.getCause();
+ }
+ return false;
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 718d93830c4..a07bbfa7f5f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -35,7 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -177,7 +177,7 @@ public class StreamingDataflowWorker {
WorkFailureProcessor workFailureProcessor,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
- AtomicInteger maxWorkItemCommitBytes,
+ AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory windmillStreamFactory,
Function<String, ScheduledExecutorService> executorSupplier,
ConcurrentMap<String, StageInfo> stageInfoMap) {
@@ -296,7 +296,7 @@ public class StreamingDataflowWorker {
streamingCounters,
hotKeyLogger,
sampler,
- maxWorkItemCommitBytes,
+ operationalLimits,
ID_GENERATOR,
stageInfoMap);
@@ -304,7 +304,6 @@ public class StreamingDataflowWorker {
LOG.debug("WindmillServiceEndpoint: {}",
options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
- LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get());
}
public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions options) {
@@ -314,7 +313,8 @@ public class StreamingDataflowWorker {
StreamingCounters streamingCounters = StreamingCounters.create();
WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options,
LOG);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
- AtomicInteger maxWorkItemCommitBytes = new
AtomicInteger(Integer.MAX_VALUE);
+ AtomicReference<OperationalLimits> operationalLimits =
+ new AtomicReference<>(OperationalLimits.builder().build());
WindmillStateCache windmillStateCache =
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
@@ -332,7 +332,7 @@ public class StreamingDataflowWorker {
createConfigFetcherComputationStateCacheAndWindmillClient(
options,
dataflowServiceClient,
- maxWorkItemCommitBytes,
+ operationalLimits,
windmillStreamFactoryBuilder,
configFetcher ->
ComputationStateCache.create(
@@ -390,7 +390,7 @@ public class StreamingDataflowWorker {
workFailureProcessor,
streamingCounters,
memoryMonitor,
- maxWorkItemCommitBytes,
+ operationalLimits,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
@@ -406,7 +406,7 @@ public class StreamingDataflowWorker {
createConfigFetcherComputationStateCacheAndWindmillClient(
DataflowWorkerHarnessOptions options,
WorkUnitClient dataflowServiceClient,
- AtomicInteger maxWorkItemCommitBytes,
+ AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
Function<ComputationConfig.Fetcher, ComputationStateCache>
computationStateCacheFactory) {
ComputationConfig.Fetcher configFetcher;
@@ -422,8 +422,9 @@ public class StreamingDataflowWorker {
config ->
onPipelineConfig(
config,
+ options,
dispatcherClient::consumeWindmillDispatcherEndpoints,
- maxWorkItemCommitBytes));
+ operationalLimits::set));
computationStateCache =
computationStateCacheFactory.apply(configFetcher);
windmillStreamFactory =
windmillStreamFactoryBuilder
@@ -469,9 +470,9 @@ public class StreamingDataflowWorker {
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier,
int localRetryTimeoutMs,
- int maxWorkItemCommitBytesOverrides) {
+ OperationalLimits limits) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
- AtomicInteger maxWorkItemCommitBytes = new
AtomicInteger(maxWorkItemCommitBytesOverrides);
+ AtomicReference<OperationalLimits> operationalLimits = new
AtomicReference<>(limits);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
WindmillStateCache stateCache =
WindmillStateCache.builder()
@@ -488,8 +489,9 @@ public class StreamingDataflowWorker {
config ->
onPipelineConfig(
config,
+ options,
windmillServer::setWindmillServiceEndpoints,
- maxWorkItemCommitBytes))
+ operationalLimits::set))
: new
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
ConcurrentMap<String, String> stateNameMap =
new ConcurrentHashMap<>(prePopulatedStateNameMappings);
@@ -557,7 +559,7 @@ public class StreamingDataflowWorker {
workFailureProcessor,
streamingCounters,
memoryMonitor,
- maxWorkItemCommitBytes,
+ operationalLimits,
options.isEnableStreamingEngine()
? windmillStreamFactory
.setHealthCheckIntervalMillis(
@@ -570,12 +572,18 @@ public class StreamingDataflowWorker {
private static void onPipelineConfig(
StreamingEnginePipelineConfig config,
+ DataflowWorkerHarnessOptions options,
Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
- AtomicInteger maxWorkItemCommitBytes) {
- if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) {
- LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
- maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes());
- }
+ Consumer<OperationalLimits> operationalLimits) {
+
+ operationalLimits.accept(
+ OperationalLimits.builder()
+ .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
+ .setMaxOutputKeyBytes(config.maxOutputKeyBytes())
+ .setMaxOutputValueBytes(config.maxOutputValueBytes())
+ .setThrowExceptionOnLargeOutput(
+ DataflowRunner.hasExperiment(options,
"throw_exceptions_on_large_output"))
+ .build());
if (!config.windmillServiceEndpoints().isEmpty()) {
consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index dd6353060ab..a594dbb1e0f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -129,6 +129,10 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
private Work work;
private WindmillComputationKey computationKey;
private SideInputStateFetcher sideInputStateFetcher;
+ // OperationalLimits is updated in start() because a
StreamingModeExecutionContext can
+ // be used for processing many work items and these values can change during
the context's
+ // lifetime. start() is called for each work item.
+ private OperationalLimits operationalLimits;
private Windmill.WorkItemCommitRequest.Builder outputBuilder;
/**
@@ -168,6 +172,18 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
return backlogBytes;
}
+ public long getMaxOutputKeyBytes() {
+ return operationalLimits.maxOutputKeyBytes;
+ }
+
+ public long getMaxOutputValueBytes() {
+ return operationalLimits.maxOutputValueBytes;
+ }
+
+ public boolean throwExceptionsForLargeOutput() {
+ return operationalLimits.throwExceptionOnLargeOutput;
+ }
+
public boolean workIsFailed() {
return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
}
@@ -177,11 +193,13 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
+ OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
this.key = key;
this.work = work;
this.computationKey = WindmillComputationKey.create(computationId,
work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
+ this.operationalLimits = operationalLimits;
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 1f26572941a..78d0c6b4550 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -44,6 +44,8 @@ import
org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
@@ -54,6 +56,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
private final Coder<T> valueCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
private StreamingModeExecutionContext context;
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillSink.class);
WindmillSink(
String destinationName,
@@ -172,6 +175,28 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
key = context.getSerializedKey();
value = encode(valueCoder, data.getValue());
}
+ if (key.size() > context.getMaxOutputKeyBytes()) {
+ if (context.throwExceptionsForLargeOutput()) {
+ throw new OutputTooLargeException("Key too large: " + key.size());
+ } else {
+ LOG.error(
+ "Trying to output too large key with size "
+ + key.size()
+ + ". Limit is "
+ + context.getMaxOutputKeyBytes());
+ }
+ }
+ if (value.size() > context.getMaxOutputValueBytes()) {
+ if (context.throwExceptionsForLargeOutput()) {
+ throw new OutputTooLargeException("Value too large: " +
value.size());
+ } else {
+ LOG.error(
+ "Trying to output too large value with size "
+ + value.size()
+ + ". Limit is "
+ + context.getMaxOutputValueBytes());
+ }
+ }
Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key);
if (keyedOutput == null) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
index dd34e85bc93..8a00194887d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
@@ -24,6 +24,7 @@ import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
@@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
* @implNote Once closed, it cannot be reused.
*/
// TODO(m-trieu): See if this can be combined/cleaned up with
StreamingModeExecutionContext as the
-// seperation of responsibilities are unclear.
+// separation of responsibilities are unclear.
@AutoValue
@Internal
@NotThreadSafe
@@ -72,9 +73,11 @@ public abstract class ComputationWorkExecutor {
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
+ OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder)
throws Exception {
- context().start(key, work, stateReader, sideInputStateFetcher,
outputBuilder);
+ context()
+ .start(key, work, stateReader, sideInputStateFetcher,
operationalLimits, outputBuilder);
workExecutor().execute();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
index 51d1507af5f..850e8c3f24b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
@@ -157,7 +157,7 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
}
}
- private static StreamingEnginePipelineConfig
createPipelineConfig(StreamingConfigTask config) {
+ private StreamingEnginePipelineConfig
createPipelineConfig(StreamingConfigTask config) {
StreamingEnginePipelineConfig.Builder pipelineConfig =
StreamingEnginePipelineConfig.builder();
if (config.getUserStepToStateFamilyNameMap() != null) {
pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap());
@@ -187,6 +187,18 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
}
+ if (config.getOperationalLimits() != null) {
+ if (config.getOperationalLimits().getMaxKeyBytes() > 0
+ && config.getOperationalLimits().getMaxKeyBytes() <=
Integer.MAX_VALUE) {
+
pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes());
+ }
+ if (config.getOperationalLimits().getMaxProductionOutputBytes() > 0
+ && config.getOperationalLimits().getMaxProductionOutputBytes() <=
Integer.MAX_VALUE) {
+ pipelineConfig.setMaxOutputValueBytes(
+ config.getOperationalLimits().getMaxProductionOutputBytes());
+ }
+ }
+
return pipelineConfig.build();
}
@@ -273,7 +285,7 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
private Optional<StreamingEnginePipelineConfig> fetchGlobalConfig() {
return
fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem)
- .map(StreamingEngineComputationConfigFetcher::createPipelineConfig);
+ .map(config -> createPipelineConfig(config));
}
@FunctionalInterface
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
index b5b761ada70..8f1ff93f6a4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
@@ -34,12 +34,18 @@ public abstract class StreamingEnginePipelineConfig {
public static StreamingEnginePipelineConfig.Builder builder() {
return new AutoValue_StreamingEnginePipelineConfig.Builder()
.setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
+ .setMaxOutputKeyBytes(Long.MAX_VALUE)
+ .setMaxOutputValueBytes(Long.MAX_VALUE)
.setUserStepToStateFamilyNameMap(new HashMap<>())
.setWindmillServiceEndpoints(ImmutableSet.of());
}
public abstract long maxWorkItemCommitBytes();
+ public abstract long maxOutputKeyBytes();
+
+ public abstract long maxOutputValueBytes();
+
public abstract Map<String, String> userStepToStateFamilyNameMap();
public abstract ImmutableSet<HostAndPort> windmillServiceEndpoints();
@@ -48,6 +54,10 @@ public abstract class StreamingEnginePipelineConfig {
public abstract static class Builder {
public abstract Builder setMaxWorkItemCommitBytes(long value);
+ public abstract Builder setMaxOutputKeyBytes(long value);
+
+ public abstract Builder setMaxOutputValueBytes(long value);
+
public abstract Builder setUserStepToStateFamilyNameMap(Map<String,
String> value);
public abstract Builder
setWindmillServiceEndpoints(ImmutableSet<HostAndPort> value);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 334ab8efeae..e9ffa982925 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
@@ -31,6 +31,7 @@ import
org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.ReaderCache;
import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException;
import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
@@ -81,7 +82,7 @@ public final class StreamingWorkScheduler {
private final HotKeyLogger hotKeyLogger;
private final ConcurrentMap<String, StageInfo> stageInfoMap;
private final DataflowExecutionStateSampler sampler;
- private final AtomicInteger maxWorkItemCommitBytes;
+ private final AtomicReference<OperationalLimits> operationalLimits;
public StreamingWorkScheduler(
DataflowWorkerHarnessOptions options,
@@ -95,7 +96,7 @@ public final class StreamingWorkScheduler {
HotKeyLogger hotKeyLogger,
ConcurrentMap<String, StageInfo> stageInfoMap,
DataflowExecutionStateSampler sampler,
- AtomicInteger maxWorkItemCommitBytes) {
+ AtomicReference<OperationalLimits> operationalLimits) {
this.options = options;
this.clock = clock;
this.computationWorkExecutorFactory = computationWorkExecutorFactory;
@@ -107,7 +108,7 @@ public final class StreamingWorkScheduler {
this.hotKeyLogger = hotKeyLogger;
this.stageInfoMap = stageInfoMap;
this.sampler = sampler;
- this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
+ this.operationalLimits = operationalLimits;
}
public static StreamingWorkScheduler create(
@@ -123,7 +124,7 @@ public final class StreamingWorkScheduler {
StreamingCounters streamingCounters,
HotKeyLogger hotKeyLogger,
DataflowExecutionStateSampler sampler,
- AtomicInteger maxWorkItemCommitBytes,
+ AtomicReference<OperationalLimits> operationalLimits,
IdGenerator idGenerator,
ConcurrentMap<String, StageInfo> stageInfoMap) {
ComputationWorkExecutorFactory computationWorkExecutorFactory =
@@ -148,7 +149,7 @@ public final class StreamingWorkScheduler {
hotKeyLogger,
stageInfoMap,
sampler,
- maxWorkItemCommitBytes);
+ operationalLimits);
}
private static long computeShuffleBytesRead(Windmill.WorkItem workItem) {
@@ -292,7 +293,7 @@ public final class StreamingWorkScheduler {
Windmill.WorkItemCommitRequest commitRequest,
String computationId,
Windmill.WorkItem workItem) {
- int byteLimit = maxWorkItemCommitBytes.get();
+ long byteLimit = operationalLimits.get().maxWorkItemCommitBytes;
int commitSize = commitRequest.getSerializedSize();
int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize;
@@ -375,7 +376,12 @@ public final class StreamingWorkScheduler {
// Blocks while executing work.
computationWorkExecutor.executeWork(
- executionKey, work, stateReader, localSideInputStateFetcher,
outputBuilder);
+ executionKey,
+ work,
+ stateReader,
+ localSideInputStateFetcher,
+ operationalLimits.get(),
+ outputBuilder);
if (work.isFailed()) {
throw new WorkItemCancelledException(workItem.getShardingKey());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index 52bc61e5991..8a4369fdbd8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -549,7 +549,6 @@ public class StreamingDataflowWorkerTest {
List<Long> inputs,
List<Timer> timers)
throws Exception {
- // Windmill.GetWorkResponse.Builder builder =
Windmill.GetWorkResponse.newBuilder();
Windmill.WorkItem.Builder builder = Windmill.WorkItem.newBuilder();
builder.setKey(DEFAULT_KEY_BYTES);
builder.setShardingKey(DEFAULT_SHARDING_KEY);
@@ -849,7 +848,7 @@ public class StreamingDataflowWorkerTest {
streamingDataflowWorkerTestParams.clock(),
streamingDataflowWorkerTestParams.executorSupplier(),
streamingDataflowWorkerTestParams.localRetryTimeoutMs(),
- streamingDataflowWorkerTestParams.maxWorkItemCommitBytes());
+ streamingDataflowWorkerTestParams.operationalLimits());
this.computationStateCache = worker.getComputationStateCache();
return worker;
}
@@ -1216,7 +1215,8 @@ public class StreamingDataflowWorkerTest {
makeWorker(
defaultWorkerParams()
.setInstructions(instructions)
- .setMaxWorkItemCommitBytes(1000)
+ .setOperationalLimits(
+
OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build())
.publishCounters()
.build());
worker.start();
@@ -1271,6 +1271,80 @@ public class StreamingDataflowWorkerTest {
assertTrue(foundErrors);
}
+ @Test
+ public void testOutputKeyTooLargeException() throws Exception {
+ KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of());
+
+ List<ParallelInstruction> instructions =
+ Arrays.asList(
+ makeSourceInstruction(kvCoder),
+ makeDoFnInstruction(new ExceptionCatchingFn(), 0, kvCoder),
+ makeSinkInstruction(kvCoder, 1));
+
+ server.setExpectedExceptionCount(1);
+
+ StreamingDataflowWorker worker =
+ makeWorker(
+ defaultWorkerParams()
+ .setInstructions(instructions)
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputKeyBytes(15)
+ .setThrowExceptionOnLargeOutput(true)
+ .build())
+ .build());
+ worker.start();
+
+ // This large key will cause the ExceptionCatchingFn to throw an
exception, which will then
+ // cause it to output a smaller key.
+ String bigKey = "some_much_too_large_output_key";
+ server.whenGetWorkCalled().thenReturn(makeInput(1, 0, bigKey,
DEFAULT_SHARDING_KEY));
+ server.waitForEmptyWorkQueue();
+
+ Map<Long, Windmill.WorkItemCommitRequest> result =
server.waitForAndGetCommits(1);
+ assertEquals(1, result.size());
+ assertEquals(
+ makeExpectedOutput(1, 0, bigKey, DEFAULT_SHARDING_KEY,
"smaller_key").build(),
+ removeDynamicFields(result.get(1L)));
+ }
+
+ @Test
+ public void testOutputValueTooLargeException() throws Exception {
+ KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of());
+
+ List<ParallelInstruction> instructions =
+ Arrays.asList(
+ makeSourceInstruction(kvCoder),
+ makeDoFnInstruction(new ExceptionCatchingFn(), 0, kvCoder),
+ makeSinkInstruction(kvCoder, 1));
+
+ server.setExpectedExceptionCount(1);
+
+ StreamingDataflowWorker worker =
+ makeWorker(
+ defaultWorkerParams()
+ .setInstructions(instructions)
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(15)
+ .setThrowExceptionOnLargeOutput(true)
+ .build())
+ .build());
+ worker.start();
+
+ // The first time processing will have value
"data1_a_bunch_more_data_output", which is above
+ // the limit. After throwing the exception, the output should be just
"data1", which is small
+ // enough.
+ server.whenGetWorkCalled().thenReturn(makeInput(1, 0, "key",
DEFAULT_SHARDING_KEY));
+ server.waitForEmptyWorkQueue();
+
+ Map<Long, Windmill.WorkItemCommitRequest> result =
server.waitForAndGetCommits(1);
+ assertEquals(1, result.size());
+ assertEquals(
+ makeExpectedOutput(1, 0, "key", DEFAULT_SHARDING_KEY,
"smaller_key").build(),
+ removeDynamicFields(result.get(1L)));
+ }
+
@Test
public void testKeyChange() throws Exception {
KvCoder<String, String> kvCoder = KvCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of());
@@ -4021,6 +4095,18 @@ public class StreamingDataflowWorkerTest {
}
}
+ static class ExceptionCatchingFn extends DoFn<KV<String, String>, KV<String,
String>> {
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ try {
+ c.output(KV.of(c.element().getKey(), c.element().getValue() +
"_a_bunch_more_data_output"));
+ } catch (Exception e) {
+ c.output(KV.of("smaller_key", c.element().getValue()));
+ }
+ }
+ }
+
static class ChangeKeysFn extends DoFn<KV<String, String>, KV<String,
String>> {
@ProcessElement
@@ -4433,7 +4519,7 @@ public class StreamingDataflowWorkerTest {
.setLocalRetryTimeoutMs(-1)
.setPublishCounters(false)
.setClock(Instant::now)
- .setMaxWorkItemCommitBytes(Integer.MAX_VALUE);
+ .setOperationalLimits(OperationalLimits.builder().build());
}
abstract ImmutableMap<String, String> stateNameMappings();
@@ -4450,7 +4536,7 @@ public class StreamingDataflowWorkerTest {
abstract int localRetryTimeoutMs();
- abstract int maxWorkItemCommitBytes();
+ abstract OperationalLimits operationalLimits();
@AutoValue.Builder
abstract static class Builder {
@@ -4484,7 +4570,7 @@ public class StreamingDataflowWorkerTest {
abstract Builder setLocalRetryTimeoutMs(int value);
- abstract Builder setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes);
+ abstract Builder setOperationalLimits(OperationalLimits
operationalLimits);
abstract StreamingDataflowWorkerTestParams build();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 6c46bda5acf..7988212efde 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -157,6 +157,7 @@ public class StreamingModeExecutionContextTest {
Watermarks.builder().setInputDataWatermark(new
Instant(1000)).build()),
stateReader,
sideInputStateFetcher,
+ OperationalLimits.builder().build(),
outputBuilder);
TimerInternals timerInternals = stepContext.timerInternals();
@@ -206,6 +207,7 @@ public class StreamingModeExecutionContextTest {
Watermarks.builder().setInputDataWatermark(new
Instant(1000)).build()),
stateReader,
sideInputStateFetcher,
+ OperationalLimits.builder().build(),
outputBuilder);
TimerInternals timerInternals = stepContext.timerInternals();
assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime()));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 5d8ebd53400..c79d947ca22 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -634,6 +634,7 @@ public class WorkerCustomSourcesTest {
Watermarks.builder().setInputDataWatermark(new
Instant(0)).build()),
mock(WindmillStateReader.class),
mock(SideInputStateFetcher.class),
+ OperationalLimits.builder().build(),
Windmill.WorkItemCommitRequest.newBuilder());
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -1000,7 +1001,7 @@ public class WorkerCustomSourcesTest {
Work.createProcessingContext(
COMPUTATION_ID,
(a, b) -> Windmill.KeyedGetDataResponse.getDefaultInstance(),
- gnored -> {}),
+ ignored -> {}),
Instant::now,
Collections.emptyList());
context.start(
@@ -1008,6 +1009,7 @@ public class WorkerCustomSourcesTest {
dummyWork,
mock(WindmillStateReader.class),
mock(SideInputStateFetcher.class),
+ OperationalLimits.builder().build(),
Windmill.WorkItemCommitRequest.newBuilder());
@SuppressWarnings({"unchecked", "rawtypes"})