This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d6759cf7dd6 [Dataflow Streaming] Add a pipeline option to skip input
elements that cannot be decoded successfully (#37762)
d6759cf7dd6 is described below
commit d6759cf7dd6d27192cb46d3c7d4cfaa535cae603
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Mar 10 13:49:43 2026 +0000
[Dataflow Streaming] Add a pipeline option to skip input elements that
cannot be decoded successfully (#37762)
Such messages will log an error but are otherwise discarded.
Update PaneInfoCoder to throw a CoderException instead of
ArrayOutOfBoundsException
---
.../options/DataflowStreamingPipelineOptions.java | 7 ++
.../beam/runners/dataflow/worker/PubsubReader.java | 42 ++++++---
.../dataflow/worker/UngroupedWindmillReader.java | 41 +++++---
.../dataflow/worker/WindmillKeyedWorkItem.java | 105 ++++++++++++++-------
.../worker/WindmillReaderIteratorBase.java | 59 ++++++++----
.../beam/runners/dataflow/worker/WindmillSink.java | 12 ++-
.../dataflow/worker/WindowingWindmillReader.java | 55 +++++++----
.../worker/StreamingDataflowWorkerTest.java | 68 ++++++++++++-
.../dataflow/worker/WindmillKeyedWorkItemTest.java | 97 +++++++++++++++++++
.../worker/WindmillReaderIteratorBaseTest.java | 42 ++++++++-
.../beam/sdk/transforms/windowing/PaneInfo.java | 6 +-
11 files changed, 418 insertions(+), 116 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index ffb2e27e55b..9cc98276f2d 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.joda.time.Duration;
/** [Internal] Options for configuring StreamingDataflowWorker. */
@@ -226,6 +227,12 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setWindmillServiceStreamMaxBackoffMillis(int value);
+ @Description(
+ "If true, log and skip input elements that are unable to successfully
decode from the streaming backend.")
+ ValueProvider<Boolean> getSkipInputElementsWithDecodingExceptions();
+
+ void setSkipInputElementsWithDecodingExceptions(ValueProvider<Boolean>
value);
+
@Description("Enables direct path mode for streaming engine.")
@Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
boolean getIsWindmillServiceDirectPathEnabled();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
index 024b790e8ca..b60cb84415f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubReader.java
@@ -24,6 +24,7 @@ import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
+import
org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
@@ -32,6 +33,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.WindowedValue;
@@ -41,26 +43,24 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Immuta
import org.checkerframework.checker.nullness.qual.Nullable;
/** A Reader that receives elements from Pubsub, via a Windmill server. */
-@SuppressWarnings({
- "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
class PubsubReader<T> extends NativeReader<WindowedValue<T>> {
private final Coder<T> coder;
private final StreamingModeExecutionContext context;
// Function used to parse Windmill data.
// If non-null, data from Windmill is expected to be a PubsubMessage
protobuf.
- private final SimpleFunction<PubsubMessage, T> parseFn;
+ private final @Nullable SimpleFunction<PubsubMessage, T> parseFn;
+ private final ValueProvider<Boolean> skipUndecodableElements;
PubsubReader(
Coder<WindowedValue<T>> coder,
StreamingModeExecutionContext context,
- SimpleFunction<PubsubMessage, T> parseFn) {
- @SuppressWarnings({"unchecked", "rawtypes"})
- WindowedValueCoder<T> windowedCoder = (WindowedValueCoder) coder;
+ @Nullable SimpleFunction<PubsubMessage, T> parseFn,
+ ValueProvider<Boolean> skipUndecodableElements) {
+ WindowedValueCoder<T> windowedCoder = (WindowedValueCoder<T>) coder;
this.coder = windowedCoder.getValueCoder();
this.context = context;
this.parseFn = parseFn;
+ this.skipUndecodableElements = skipUndecodableElements;
}
/** A {@link ReaderFactory.Registrar} for pubsub sources. */
@@ -75,19 +75,19 @@ class PubsubReader<T> extends
NativeReader<WindowedValue<T>> {
}
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
static class Factory implements ReaderFactory {
@Override
public NativeReader<?> create(
CloudObject cloudSourceSpec,
- Coder<?> coder,
+ @Nullable Coder<?> coder,
@Nullable PipelineOptions options,
@Nullable DataflowExecutionContext executionContext,
DataflowOperationContext operationContext)
throws Exception {
- coder = checkArgumentNotNull(coder);
- @SuppressWarnings("unchecked")
- Coder<WindowedValue<Object>> typedCoder = (Coder<WindowedValue<Object>>)
coder;
- SimpleFunction<PubsubMessage, Object> parseFn = null;
+ Coder<WindowedValue<Object>> typedCoder =
+ (Coder<WindowedValue<Object>>) checkArgumentNotNull(coder);
+ @Nullable SimpleFunction<PubsubMessage, Object> parseFn = null;
byte[] attributesFnBytes =
getBytes(cloudSourceSpec,
PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, null);
// If attributesFnBytes is set, Pubsub data will be in PubsubMessage
protobuf format. The
@@ -98,8 +98,20 @@ class PubsubReader<T> extends NativeReader<WindowedValue<T>>
{
(SimpleFunction<PubsubMessage, Object>)
SerializableUtils.deserializeFromByteArray(attributesFnBytes,
"serialized fn info");
}
+ @Nullable
+ ValueProvider<Boolean> skipUndecodableElements =
+ (options != null)
+ ? options
+ .as(DataflowStreamingPipelineOptions.class)
+ .getSkipInputElementsWithDecodingExceptions()
+ : null;
return new PubsubReader<>(
- typedCoder, (StreamingModeExecutionContext) executionContext,
parseFn);
+ typedCoder,
+ (StreamingModeExecutionContext)
checkArgumentNotNull(executionContext),
+ parseFn,
+ skipUndecodableElements != null
+ ? skipUndecodableElements
+ : ValueProvider.StaticValueProvider.of(false));
}
}
@@ -110,7 +122,7 @@ class PubsubReader<T> extends
NativeReader<WindowedValue<T>> {
class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
protected PubsubReaderIterator(Windmill.WorkItem work) {
- super(work);
+ super(work, skipUndecodableElements);
}
@Override
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
index 625dc590d24..2347529cf4a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.service.AutoService;
import java.io.IOException;
@@ -25,12 +26,14 @@ import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import
org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
@@ -45,20 +48,21 @@ import org.joda.time.Instant;
/**
* A Reader that receives input data from a Windmill server, and returns it as
individual elements.
*/
-@SuppressWarnings({
- "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
class UngroupedWindmillReader<T> extends NativeReader<WindowedValue<T>> {
private final Coder<T> valueCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
- private StreamingModeExecutionContext context;
+ private final StreamingModeExecutionContext context;
+ private final ValueProvider<Boolean> skipUndecodableElements;
- UngroupedWindmillReader(Coder<WindowedValue<T>> coder,
StreamingModeExecutionContext context) {
+ UngroupedWindmillReader(
+ Coder<WindowedValue<T>> coder,
+ StreamingModeExecutionContext context,
+ ValueProvider<Boolean> skipUndecodableElements) {
FullWindowedValueCoder<T> inputCoder = (FullWindowedValueCoder<T>) coder;
this.valueCoder = inputCoder.getValueCoder();
this.windowsCoder = inputCoder.getWindowsCoder();
this.context = context;
+ this.skipUndecodableElements = skipUndecodableElements;
}
/** A {@link ReaderFactory.Registrar} for ungrouped windmill sources. */
@@ -75,6 +79,7 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
}
}
+ @SuppressWarnings({"unchecked", "rawtypes"})
static class Factory implements ReaderFactory {
@Override
public NativeReader<?> create(
@@ -84,11 +89,21 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
@Nullable DataflowExecutionContext executionContext,
DataflowOperationContext operationContext)
throws Exception {
- coder = checkArgumentNotNull(coder);
- @SuppressWarnings("unchecked")
- Coder<WindowedValue<Object>> typedCoder = (Coder<WindowedValue<Object>>)
coder;
+ Coder<WindowedValue<Object>> typedCoder =
+ (Coder<WindowedValue<Object>>) checkArgumentNotNull(coder);
+ @Nullable
+ ValueProvider<Boolean> skipUndecodableElements =
+ options != null
+ ? options
+ .as(DataflowStreamingPipelineOptions.class)
+ .getSkipInputElementsWithDecodingExceptions()
+ : null;
return new UngroupedWindmillReader<>(
- typedCoder, (StreamingModeExecutionContext) executionContext);
+ typedCoder,
+ (StreamingModeExecutionContext)
checkArgumentNotNull(executionContext),
+ skipUndecodableElements != null
+ ? skipUndecodableElements
+ : ValueProvider.StaticValueProvider.of(false));
}
}
@@ -97,9 +112,9 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
return new UngroupedWindmillReaderIterator(context.getWorkItem());
}
- class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase {
+ class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
- super(work);
+ super(work, skipUndecodableElements);
}
@Override
@@ -134,7 +149,7 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
}
if (valueCoder instanceof KvCoder) {
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
- InputStream key = context.getSerializedKey().newInput();
+ InputStream key = checkNotNull(context.getSerializedKey()).newInput();
notifyElementRead(key.available() + data.available() +
metadata.available());
@SuppressWarnings("unchecked")
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
index 59489babf0b..c328719bfb5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
@@ -34,6 +34,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
@@ -49,6 +50,8 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Fluent
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An implementation of {@link KeyedWorkItem} that wraps around a {@code
Windmill.WorkItem}.
@@ -56,14 +59,13 @@ import org.joda.time.Instant;
* @param <K> the key type
* @param <ElemT> the element type
*/
-@SuppressWarnings({
- "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K,
ElemT> {
private static final Predicate<Timer> IS_WATERMARK =
input -> input.getType() == Timer.Type.WATERMARK;
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillKeyedWorkItem.class);
+
private final Windmill.WorkItem workItem;
private final K key;
// used to inform that timer was caused by drain
@@ -73,6 +75,7 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
private final transient Coder<Collection<? extends BoundedWindow>>
windowsCoder;
private final transient Coder<ElemT> valueCoder;
private final WindmillTagEncoding windmillTagEncoding;
+ private final boolean skipUndecodableElements;
public WindmillKeyedWorkItem(
K key,
@@ -82,6 +85,26 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
Coder<ElemT> valueCoder,
WindmillTagEncoding windmillTagEncoding,
boolean drainMode) {
+ this(
+ key,
+ workItem,
+ windowCoder,
+ windowsCoder,
+ valueCoder,
+ windmillTagEncoding,
+ drainMode,
+ false);
+ }
+
+ public WindmillKeyedWorkItem(
+ K key,
+ Windmill.WorkItem workItem,
+ Coder<? extends BoundedWindow> windowCoder,
+ Coder<Collection<? extends BoundedWindow>> windowsCoder,
+ Coder<ElemT> valueCoder,
+ WindmillTagEncoding windmillTagEncoding,
+ boolean drainMode,
+ boolean skipUndecodableElements) {
this.key = key;
this.workItem = workItem;
this.windowCoder = windowCoder;
@@ -89,6 +112,7 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
this.valueCoder = valueCoder;
this.windmillTagEncoding = windmillTagEncoding;
this.drainMode = drainMode;
+ this.skipUndecodableElements = skipUndecodableElements;
}
@Override
@@ -113,39 +137,49 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
});
}
+ private @Nullable WindowedValue<ElemT> parseElem(Windmill.Message message) {
+ try {
+ Instant timestamp =
WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp());
+ Collection<? extends BoundedWindow> windows =
+ WindmillSink.decodeMetadataWindows(windowsCoder,
message.getMetadata());
+ PaneInfo paneInfo =
WindmillSink.decodeMetadataPane(message.getMetadata());
+ /**
+ * https://s.apache.org/beam-drain-mode - propagate drain bit if
aggregation/expiry induced by
+ * drain happened upstream
+ */
+ CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
+ if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
+ BeamFnApi.Elements.ElementMetadata elementMetadata =
+ WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
+ drainingValueFromUpstream =
+ elementMetadata.getDrain() ==
BeamFnApi.Elements.DrainMode.Enum.DRAINING
+ ? CausedByDrain.CAUSED_BY_DRAIN
+ : CausedByDrain.NORMAL;
+ }
+ InputStream inputStream = message.getData().newInput();
+ ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
+ return WindowedValues.of(
+ value, timestamp, windows, paneInfo, null, null,
drainingValueFromUpstream);
+ } catch (RuntimeException | IOException e) {
+ if (!skipUndecodableElements) {
+ throw new RuntimeException(e);
+ }
+ LOG.error(
+ "Skipping input element for work token {} on sharding key {} due to
decoding error",
+ workItem.getWorkToken(),
+ workItem.getShardingKey(),
+ e);
+ return null;
+ }
+ }
+
@Override
+ @SuppressWarnings("nullness")
public Iterable<WindowedValue<ElemT>> elementsIterable() {
return FluentIterable.from(workItem.getMessageBundlesList())
.transformAndConcat(Windmill.InputMessageBundle::getMessagesList)
- .transform(
- message -> {
- try {
- Instant timestamp =
-
WindmillTimeUtils.windmillToHarnessTimestamp(message.getTimestamp());
- Collection<? extends BoundedWindow> windows =
- WindmillSink.decodeMetadataWindows(windowsCoder,
message.getMetadata());
- PaneInfo paneInfo =
WindmillSink.decodeMetadataPane(message.getMetadata());
- /**
- * https://s.apache.org/beam-drain-mode - propagate drain bit
if aggregation/expiry
- * induced by drain happened upstream
- */
- CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
- if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
- BeamFnApi.Elements.ElementMetadata elementMetadata =
- WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
- drainingValueFromUpstream =
- elementMetadata.getDrain() ==
BeamFnApi.Elements.DrainMode.Enum.DRAINING
- ? CausedByDrain.CAUSED_BY_DRAIN
- : CausedByDrain.NORMAL;
- }
- InputStream inputStream = message.getData().newInput();
- ElemT value = valueCoder.decode(inputStream,
Coder.Context.OUTER);
- return WindowedValues.of(
- value, timestamp, windows, paneInfo, null, null,
drainingValueFromUpstream);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ .transform(this::parseElem)
+ .filter(Objects::nonNull);
}
@Override
@@ -237,12 +271,13 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
return kvCoder.getValueCoder();
}
+ @SuppressWarnings("unchecked")
protected FakeKeyedWorkItemCoder(Coder<?> elemCoder) {
if (elemCoder instanceof KeyedWorkItemCoder) {
- KeyedWorkItemCoder kwiCoder = (KeyedWorkItemCoder) elemCoder;
+ KeyedWorkItemCoder<K, ElemT> kwiCoder = (KeyedWorkItemCoder<K, ElemT>)
elemCoder;
this.kvCoder = KvCoder.of(kwiCoder.getKeyCoder(),
kwiCoder.getElementCoder());
} else if (elemCoder instanceof KvCoder) {
- this.kvCoder = ((KvCoder) elemCoder);
+ this.kvCoder = (KvCoder<K, ElemT>) elemCoder;
} else {
throw new IllegalArgumentException(
"FakeKeyedWorkItemCoder only works with KeyedWorkItemCoder or
KvCoder; was: "
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
index 5e68641bf66..7e6508a4788 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBase.java
@@ -17,27 +17,33 @@
*/
package org.apache.beam.runners.dataflow.worker;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
import java.io.IOException;
import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.WindowedValue;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base class for iterators that decode messages from bundles inside a {@link
Windmill.WorkItem}.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
public abstract class WindmillReaderIteratorBase<T>
extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
- private Windmill.WorkItem work;
+ private final Windmill.WorkItem work;
private int bundleIndex = 0;
private int messageIndex = -1;
- private Optional<WindowedValue<T>> current;
+ private @Nullable WindowedValue<T> current = null;
+ private final ValueProvider<Boolean> skipUndecodableElements;
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillReaderIteratorBase.class);
- protected WindmillReaderIteratorBase(Windmill.WorkItem work) {
+ protected WindmillReaderIteratorBase(
+ Windmill.WorkItem work, ValueProvider<Boolean> skipUndecodableElements) {
+ this.skipUndecodableElements = skipUndecodableElements;
this.work = work;
}
@@ -48,30 +54,43 @@ public abstract class WindmillReaderIteratorBase<T>
@Override
public boolean advance() throws IOException {
- if (bundleIndex == work.getMessageBundlesCount()
- || messageIndex ==
work.getMessageBundles(bundleIndex).getMessagesCount()) {
- current = Optional.absent();
- return false;
- }
- ++messageIndex;
- for (; bundleIndex < work.getMessageBundlesCount(); ++bundleIndex,
messageIndex = 0) {
+ while (true) {
+ if (bundleIndex >= work.getMessageBundlesCount()) {
+ current = null;
+ return false;
+ }
Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);
- if (messageIndex < bundle.getMessagesCount()) {
- current = Optional.of(decodeMessage(bundle.getMessages(messageIndex)));
+ ++messageIndex;
+ if (messageIndex >= bundle.getMessagesCount()) {
+ messageIndex = -1;
+ ++bundleIndex;
+ continue;
+ }
+ try {
+ current =
checkNotNull(decodeMessage(bundle.getMessages(messageIndex)));
return true;
+ } catch (RuntimeException | IOException e) {
+ if (skipUndecodableElements.isAccessible()
+ && Boolean.TRUE.equals(skipUndecodableElements.get())) {
+ LOG.error(
+ "Skipping input element for work token {} on sharding key {} due
to decoding error",
+ work.getWorkToken(),
+ work.getShardingKey(),
+ e);
+ continue;
+ }
+ throw e;
}
}
- current = Optional.absent();
- return false;
}
protected abstract WindowedValue<T> decodeMessage(Windmill.Message message)
throws IOException;
@Override
public WindowedValue<T> getCurrent() throws NoSuchElementException {
- if (!current.isPresent()) {
+ if (current == null) {
throw new NoSuchElementException();
}
- return current.get();
+ return current;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 2ed29125bd4..abee9a33df2 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -214,7 +214,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
}
@Override
- @SuppressWarnings({"rawtypes", "NestedInstanceOfConditions"})
+ @SuppressWarnings("rawtypes")
public long add(WindowedValue<T> data) throws IOException {
ByteString key, value;
ByteString id = ByteString.EMPTY;
@@ -228,16 +228,18 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
KvCoder kvCoder = (KvCoder) valueCoder;
KV kv = checkNotNull((KV) data.getValue());
key = encode(kvCoder.getKeyCoder(), kv.getKey());
- Coder valueCoder = kvCoder.getValueCoder();
+ Coder nestedValueCoder = kvCoder.getValueCoder();
// If ids are explicitly provided, use that instead of the
windmill-generated id.
// This is used when reading an UnboundedSource to deduplicate records.
- if (valueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) {
+ if (nestedValueCoder instanceof
ValueWithRecordId.ValueWithRecordIdCoder) {
ValueWithRecordId valueAndId = checkNotNull((ValueWithRecordId)
kv.getValue());
value =
- encode(((ValueWithRecordIdCoder) valueCoder).getValueCoder(),
valueAndId.getValue());
+ encode(
+ ((ValueWithRecordIdCoder) nestedValueCoder).getValueCoder(),
+ valueAndId.getValue());
id = ByteString.copyFrom(valueAndId.getId());
} else {
- value = encode(valueCoder, kv.getValue());
+ value = encode(nestedValueCoder, kv.getValue());
}
} else {
key = checkNotNull(context.getSerializedKey());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
index 7dd55d91211..173b254f639 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import com.google.auto.service.AutoService;
import java.io.IOException;
@@ -25,12 +26,15 @@ import java.util.Collection;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.beam.runners.core.KeyedWorkItem;
+import
org.apache.beam.runners.dataflow.options.DataflowStreamingPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
@@ -42,10 +46,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
* A Reader that receives input data from a Windmill server, and returns a
singleton iterable
* containing the work item.
*/
-@SuppressWarnings({
- "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
class WindowingWindmillReader<K, T> extends
NativeReader<WindowedValue<KeyedWorkItem<K, T>>> {
private final Coder<K> keyCoder;
@@ -53,19 +54,22 @@ class WindowingWindmillReader<K, T> extends
NativeReader<WindowedValue<KeyedWork
private final Coder<? extends BoundedWindow> windowCoder;
private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
private StreamingModeExecutionContext context;
+ private final ValueProvider<Boolean> skipUndecodableElements;
WindowingWindmillReader(
- Coder<WindowedValue<KeyedWorkItem<K, T>>> coder,
StreamingModeExecutionContext context) {
+ Coder<WindowedValue<KeyedWorkItem<K, T>>> coder,
+ StreamingModeExecutionContext context,
+ ValueProvider<Boolean> skipUndecodableElements) {
FullWindowedValueCoder<KeyedWorkItem<K, T>> inputCoder =
(FullWindowedValueCoder<KeyedWorkItem<K, T>>) coder;
this.windowsCoder = inputCoder.getWindowsCoder();
this.windowCoder = inputCoder.getWindowCoder();
- @SuppressWarnings("unchecked")
WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, T> keyedWorkItemCoder =
(WindmillKeyedWorkItem.FakeKeyedWorkItemCoder<K, T>)
inputCoder.getValueCoder();
this.keyCoder = keyedWorkItemCoder.getKeyCoder();
this.valueCoder = keyedWorkItemCoder.getElementCoder();
this.context = context;
+ this.skipUndecodableElements = skipUndecodableElements;
}
/** A {@link ReaderFactory.Registrar} for grouping windmill sources. */
@@ -87,6 +91,7 @@ class WindowingWindmillReader<K, T> extends
NativeReader<WindowedValue<KeyedWork
static class Factory implements ReaderFactory {
@Override
+ @SuppressWarnings("rawtypes")
public NativeReader<?> create(
CloudObject spec,
@Nullable Coder<?> coder,
@@ -94,14 +99,22 @@ class WindowingWindmillReader<K, T> extends
NativeReader<WindowedValue<KeyedWork
@Nullable DataflowExecutionContext context,
DataflowOperationContext operationContext)
throws Exception {
- coder = checkArgumentNotNull(coder);
- @SuppressWarnings({
- "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
- "unchecked"
- })
+ @SuppressWarnings("unchecked")
Coder<WindowedValue<KeyedWorkItem<Object, Object>>> typedCoder =
- (Coder<WindowedValue<KeyedWorkItem<Object, Object>>>) coder;
- return WindowingWindmillReader.create(typedCoder,
(StreamingModeExecutionContext) context);
+ (Coder<WindowedValue<KeyedWorkItem<Object, Object>>>)
checkArgumentNotNull(coder);
+ @Nullable
+ ValueProvider<Boolean> skipUndecodableElements =
+ (options != null)
+ ? options
+ .as(DataflowStreamingPipelineOptions.class)
+ .getSkipInputElementsWithDecodingExceptions()
+ : null;
+ return WindowingWindmillReader.create(
+ typedCoder,
+ (StreamingModeExecutionContext) checkArgumentNotNull(context),
+ skipUndecodableElements != null
+ ? skipUndecodableElements
+ : ValueProvider.StaticValueProvider.of(false));
}
}
@@ -110,13 +123,17 @@ class WindowingWindmillReader<K, T> extends
NativeReader<WindowedValue<KeyedWork
* StreamingModeExecutionContext}.
*/
public static <K, T> WindowingWindmillReader<K, T> create(
- Coder<WindowedValue<KeyedWorkItem<K, T>>> coder,
StreamingModeExecutionContext context) {
- return new WindowingWindmillReader<K, T>(coder, context);
+ Coder<WindowedValue<KeyedWorkItem<K, T>>> coder,
+ StreamingModeExecutionContext context,
+ ValueProvider<Boolean> skipUndecodableElements) {
+ return new WindowingWindmillReader<>(coder, context,
skipUndecodableElements);
}
@Override
public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator()
throws IOException {
- final K key = keyCoder.decode(context.getSerializedKey().newInput(),
Coder.Context.OUTER);
+ final K key =
+ keyCoder.decode(
+ checkStateNotNull(context.getSerializedKey()).newInput(),
Coder.Context.OUTER);
final WorkItem workItem = context.getWorkItem();
KeyedWorkItem<K, T> keyedWorkItem =
new WindmillKeyedWorkItem<>(
@@ -126,7 +143,9 @@ class WindowingWindmillReader<K, T> extends
NativeReader<WindowedValue<KeyedWork
windowsCoder,
valueCoder,
context.getWindmillTagEncoding(),
- context.getDrainMode());
+ context.getDrainMode(),
+ skipUndecodableElements.isAccessible()
+ && Boolean.TRUE.equals(skipUndecodableElements.get()));
final boolean isEmptyWorkItem =
(Iterables.isEmpty(keyedWorkItem.timersIterable())
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));
@@ -152,7 +171,7 @@ class WindowingWindmillReader<K, T> extends
NativeReader<WindowedValue<KeyedWork
};
} else {
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
- private WindowedValue<KeyedWorkItem<K, T>> current;
+ private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
@Override
public boolean start() throws IOException {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index 15304e5cb9f..fafa27f98fc 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -148,6 +148,7 @@ import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -223,9 +224,7 @@ import org.slf4j.LoggerFactory;
/** Unit tests for {@link StreamingDataflowWorker}. */
@RunWith(Parameterized.class)
-// TODO(https://github.com/apache/beam/issues/21230): Remove when new version
of errorprone is
-// released (2.11.0)
-@SuppressWarnings({"unused", "deprecation"})
+@SuppressWarnings("deprecation")
public class StreamingDataflowWorkerTest {
private static final Logger LOG =
LoggerFactory.getLogger(StreamingDataflowWorkerTest.class);
@@ -857,6 +856,7 @@ public class StreamingDataflowWorkerTest {
if (streamingEngine) {
argsList.add("--experiments=enable_streaming_engine");
}
+ LOG.info("Running with args {}", argsList);
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.fromArgs(argsList.toArray(new String[0]))
.as(DataflowWorkerHarnessOptions.class);
@@ -870,7 +870,6 @@ public class StreamingDataflowWorkerTest {
if (options.getActiveWorkRefreshPeriodMillis() == 10000) {
options.setActiveWorkRefreshPeriodMillis(0);
}
-
return options;
}
@@ -4318,6 +4317,67 @@ public class StreamingDataflowWorkerTest {
}
}
+ @Test
+ public void testSkipInputElementsWithDecodingExceptions() throws Exception {
+ KvCoder<String, Integer> kvCoder = KvCoder.of(StringUtf8Coder.of(),
TextualIntegerCoder.of());
+ List<ParallelInstruction> instructions =
+ Arrays.asList(makeSourceInstruction(kvCoder),
makeSinkInstruction(kvCoder, 0));
+
+ StreamingDataflowWorker worker =
+ makeWorker(
+
defaultWorkerParams("--skipInputElementsWithDecodingExceptions=true")
+ .setInstructions(instructions)
+ .publishCounters()
+ .build());
+ worker.start();
+
+ // Create a work item with one valid message and one corrupted message.
+ Windmill.GetWorkResponse.Builder builder =
Windmill.GetWorkResponse.newBuilder();
+ Windmill.ComputationWorkItems.Builder computationBuilder =
+
builder.addWorkBuilder().setComputationId(DEFAULT_COMPUTATION_ID).setInputDataWatermark(1);
+ Windmill.WorkItem.Builder workItemBuilder =
+ computationBuilder
+ .addWorkBuilder()
+ .setKey(DEFAULT_KEY_BYTES)
+ .setShardingKey(DEFAULT_SHARDING_KEY)
+ .setWorkToken(1)
+ .setCacheToken(2);
+
+ Windmill.InputMessageBundle.Builder bundleBuilder =
+ workItemBuilder
+ .addMessageBundlesBuilder()
+ .setSourceComputationId(DEFAULT_SOURCE_COMPUTATION_ID);
+
+ // Valid message
+ bundleBuilder
+ .addMessagesBuilder()
+ .setTimestamp(0)
+ .setData(ByteString.copyFromUtf8("12345"))
+ .setMetadata(addPaneTag(PaneInfo.NO_FIRING,
intervalWindowBytes(DEFAULT_WINDOW)));
+
+ // Corrupted message (invalid data for kvCoder)
+ bundleBuilder
+ .addMessagesBuilder()
+ .setTimestamp(1000)
+ .setData(ByteString.copyFromUtf8("54321corrupted data"))
+ .setMetadata(addPaneTag(PaneInfo.NO_FIRING,
intervalWindowBytes(DEFAULT_WINDOW)));
+
+ server.whenGetWorkCalled().thenReturn(builder.build());
+
+ Map<Long, Windmill.WorkItemCommitRequest> result =
server.waitForAndGetCommits(1);
+ worker.stop();
+
+ assertTrue(result.containsKey(1L));
+ Windmill.WorkItemCommitRequest commit = result.get(1L);
+
+ // Verify that only the valid message was processed and output.
+ assertEquals(1, commit.getOutputMessagesCount());
+ assertEquals(1,
commit.getOutputMessages(0).getBundles(0).getMessagesCount());
+ assertEquals("key",
commit.getOutputMessages(0).getBundles(0).getKey().toStringUtf8());
+ assertEquals(
+ "12345",
commit.getOutputMessages(0).getBundles(0).getMessages(0).getData().toStringUtf8());
+ }
+
static class BlockingFn extends DoFn<String, String> implements TestRule {
public static AtomicReference<CountDownLatch> blocker =
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index 69aa4d0d69a..c1568058435 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -18,7 +18,9 @@
package org.apache.beam.runners.dataflow.worker;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -118,6 +120,93 @@ public class WindmillKeyedWorkItemTest {
WindowedValues.of("earth", new Instant(6), WINDOW_1,
paneInfo(1))));
}
+ @Test
+ public void testElementIterationWithSkipEnabled() throws Exception {
+ Windmill.WorkItem.Builder workItem =
+ Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
+ Windmill.InputMessageBundle.Builder chunk1 =
workItem.addMessageBundlesBuilder();
+ chunk1.setSourceComputationId("computation");
+ addElement(chunk1, 5, "hello", WINDOW_1, paneInfo(0));
+ addElement(chunk1, 7, "world", WINDOW_2, paneInfo(2));
+ Windmill.InputMessageBundle.Builder chunk2 =
workItem.addMessageBundlesBuilder();
+ chunk2.setSourceComputationId("computation");
+ addElement(chunk2, 6, "earth", WINDOW_1, paneInfo(1));
+
+ KeyedWorkItem<String, String> keyedWorkItem =
+ new WindmillKeyedWorkItem<>(
+ KEY,
+ workItem.build(),
+ WINDOW_CODER,
+ WINDOWS_CODER,
+ VALUE_CODER,
+ windmillTagEncoding,
+ false,
+ true);
+
+ assertThat(
+ keyedWorkItem.elementsIterable(),
+ Matchers.contains(
+ WindowedValues.of("hello", new Instant(5), WINDOW_1, paneInfo(0)),
+ WindowedValues.of("world", new Instant(7), WINDOW_2, paneInfo(2)),
+ WindowedValues.of("earth", new Instant(6), WINDOW_1,
paneInfo(1))));
+ }
+
+ @Test
+ public void testElementIterationSkips() throws Exception {
+ Windmill.WorkItem.Builder workItem =
+ Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
+ Windmill.InputMessageBundle.Builder chunk1 =
workItem.addMessageBundlesBuilder();
+ chunk1.setSourceComputationId("computation");
+ addElement(chunk1, 5, "hello", WINDOW_1, paneInfo(0));
+ addCorruptedElement(chunk1);
+ Windmill.InputMessageBundle.Builder chunk2 =
workItem.addMessageBundlesBuilder();
+ chunk2.setSourceComputationId("computation");
+ addElement(chunk2, 6, "earth", WINDOW_1, paneInfo(1));
+
+ KeyedWorkItem<String, String> keyedWorkItem =
+ new WindmillKeyedWorkItem<>(
+ KEY,
+ workItem.build(),
+ WINDOW_CODER,
+ WINDOWS_CODER,
+ VALUE_CODER,
+ windmillTagEncoding,
+ false,
+ true);
+
+ assertThat(
+ keyedWorkItem.elementsIterable(),
+ Matchers.contains(
+ WindowedValues.of("hello", new Instant(5), WINDOW_1, paneInfo(0)),
+ WindowedValues.of("earth", new Instant(6), WINDOW_1,
paneInfo(1))));
+ }
+
+ @Test
+ public void testElementIterationAllSkips() throws Exception {
+ Windmill.WorkItem.Builder workItem =
+ Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
+ Windmill.InputMessageBundle.Builder chunk1 =
workItem.addMessageBundlesBuilder();
+ chunk1.setSourceComputationId("computation");
+ addCorruptedElement(chunk1);
+ addCorruptedElement(chunk1);
+ Windmill.InputMessageBundle.Builder chunk2 =
workItem.addMessageBundlesBuilder();
+ chunk2.setSourceComputationId("computation");
+ addCorruptedElement(chunk2);
+
+ KeyedWorkItem<String, String> keyedWorkItem =
+ new WindmillKeyedWorkItem<>(
+ KEY,
+ workItem.build(),
+ WINDOW_CODER,
+ WINDOWS_CODER,
+ VALUE_CODER,
+ windmillTagEncoding,
+ false,
+ true);
+
+ assertTrue(Iterables.isEmpty(keyedWorkItem.elementsIterable()));
+ }
+
private void addElement(
Windmill.InputMessageBundle.Builder chunk,
long timestamp,
@@ -156,6 +245,14 @@ public class WindmillKeyedWorkItemTest {
.setMetadata(encodedMetadata);
}
+ private void addCorruptedElement(Windmill.InputMessageBundle.Builder chunk) {
+ chunk
+ .addMessagesBuilder()
+ .setTimestamp(1)
+ .setData(ByteString.copyFromUtf8("bad data"))
+ .setMetadata(ByteString.copyFromUtf8("bad metadata"));
+ }
+
private PaneInfo paneInfo(int index) {
return PaneInfo.createPane(false, false, Timing.EARLY, index, -1);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
index b6a4cb86c68..61e2f4250d0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillReaderIteratorBaseTest.java
@@ -24,7 +24,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
@@ -36,12 +39,16 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class WindmillReaderIteratorBaseTest {
private static class TestWindmillReaderIterator extends
WindmillReaderIteratorBase<Long> {
- protected TestWindmillReaderIterator(Windmill.WorkItem work) {
- super(work);
+ protected TestWindmillReaderIterator(
+ Windmill.WorkItem work, ValueProvider<Boolean>
skipUndecodableElements) {
+ super(work, skipUndecodableElements);
}
@Override
- protected WindowedValue<Long> decodeMessage(Windmill.Message message) {
+ protected WindowedValue<Long> decodeMessage(Windmill.Message message)
throws CoderException {
+ if (message.getTimestamp() < 0) {
+ throw new CoderException("Injected decoding error to test skipping.");
+ }
return WindowedValues.valueInGlobalWindow(message.getTimestamp());
}
}
@@ -60,7 +67,26 @@ public class WindmillReaderIteratorBaseTest {
testForMessageBundleCounts(0, 0, 1, 3, 0, 1, 0, 0, 0, 0);
}
+ @Test
+ public void testSkipErrors() throws IOException {
+ testForMessageBundleCounts(true);
+ testForMessageBundleCounts(true, 0);
+ testForMessageBundleCounts(true, 0, 0);
+ testForMessageBundleCounts(true, 1);
+ testForMessageBundleCounts(true, 2);
+ testForMessageBundleCounts(true, 1, 1);
+ testForMessageBundleCounts(true, 0, 1);
+ testForMessageBundleCounts(true, 1, 0);
+ testForMessageBundleCounts(true, 0, 0, 1, 3, 0, 1, 0, 0, 0, 1);
+ testForMessageBundleCounts(true, 0, 0, 1, 3, 0, 1, 0, 0, 0, 0);
+ }
+
private void testForMessageBundleCounts(int... messageBundleCounts) throws
IOException {
+ testForMessageBundleCounts(false, messageBundleCounts);
+ }
+
+ private void testForMessageBundleCounts(boolean skipErrors, int...
messageBundleCounts)
+ throws IOException {
List<Windmill.InputMessageBundle> bundles = new ArrayList<>();
long numTotalMessages = 0;
for (int count : messageBundleCounts) {
@@ -73,6 +99,10 @@ public class WindmillReaderIteratorBaseTest {
.setData(ByteString.EMPTY)
.build());
}
+ if (skipErrors && ThreadLocalRandom.current().nextBoolean()) {
+ bundle.addMessages(
+
Windmill.Message.newBuilder().setTimestamp(-10).setData(ByteString.EMPTY).build());
+ }
bundles.add(bundle.build());
}
Windmill.WorkItem workItem =
@@ -81,7 +111,9 @@ public class WindmillReaderIteratorBaseTest {
.setWorkToken(0L)
.addAllMessageBundles(bundles)
.build();
- try (TestWindmillReaderIterator iter = new
TestWindmillReaderIterator(workItem)) {
+ try (TestWindmillReaderIterator iter =
+ new TestWindmillReaderIterator(
+ workItem, ValueProvider.StaticValueProvider.of(skipErrors))) {
List<Long> actual =
ReaderTestUtils.windowedValuesToValues(
ReaderUtils.readRemainingFromIterator(iter, false));
@@ -90,7 +122,7 @@ public class WindmillReaderIteratorBaseTest {
for (int i = 0; i < numTotalMessages; ++i) {
expected.add((long) i);
}
- assertEquals(Arrays.toString(messageBundleCounts), expected, actual);
+ assertEquals(Arrays.toString(messageBundleCounts) + skipErrors,
expected, actual);
}
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index f253d179483..4805122035a 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -341,7 +341,11 @@ public final class PaneInfo {
tag = (byte) (ordinal() << 4);
}
- public static Encoding fromTag(byte b) {
+ public static Encoding fromTag(byte b) throws CoderException {
+ int index = b >> 4;
+ if (index < 0 || index >= values().length) {
+ throw new CoderException("Invalid pane encoding " + index);
+ }
return Encoding.values()[b >> 4];
}
}