This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6f3bcd9 [BEAM-7012] Support TestStream in streaming Flink Runner new c44c30f Merge pull request #8383: [BEAM-7012] Support TestStream in streaming Flink Runner 6f3bcd9 is described below commit 6f3bcd932d4fe70711cb3653192171dad67867dc Author: Maximilian Michels <m...@apache.org> AuthorDate: Tue Apr 23 18:49:26 2019 +0200 [BEAM-7012] Support TestStream in streaming Flink Runner TestStream is a way to construct a stream with control over elements and time. In total, 40 ValidatesRunner tests make use of this feature (tagged via UsesTestStream). So far only the DirectRunner supported TestStream which meant that those tests were not executed for other Runners, e.g. Flink. Implementing TestStream for Flink was not hard, except for supporting the processing time functionality for which a clean solution seems impossible. However, only 2 of the 40 UsesTestStream tests make use of processing time. An annotation (UsesTestStreamWithProcessingTime) was added to allow Runners to exclude those tests. This still adds 38 new ValidatesRunner tests in Flink streaming mode. --- runners/flink/flink_runner.gradle | 3 +- .../flink/FlinkStreamingTransformTranslators.java | 37 +++++++++ .../wrappers/streaming/io/TestStreamSource.java | 80 ++++++++++++++++++++ .../org/apache/beam/sdk/testing/TestStream.java | 88 ++++++++++++++++++++++ .../apache/beam/sdk/testing/UsesTestStream.java | 2 +- ....java => UsesTestStreamWithProcessingTime.java} | 7 +- .../apache/beam/sdk/testing/TestStreamTest.java | 21 +++++- .../apache/beam/sdk/transforms/GroupByKeyTest.java | 4 +- .../org/apache/beam/sdk/transforms/ParDoTest.java | 7 +- 9 files changed, 240 insertions(+), 9 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index db6d3e3..4bc9944 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -151,10 +151,11 @@ def createValidatesRunnerTask(Map m) { excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' if (config.streaming) { excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' } else { excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' + excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } - excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 8d42d18..760efdd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -49,14 +49,17 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.transforms.DoFn; @@ -70,6 +73,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; @@ -147,6 +151,8 @@ class FlinkStreamingTransformTranslators { TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator()); TRANSLATORS.put( PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new CombinePerKeyTranslator()); + + TRANSLATORS.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, new TestStreamTranslator()); } public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator( @@ -1284,6 +1290,37 @@ class FlinkStreamingTransformTranslators { } } + /** A translator to support {@link TestStream} with Flink. */ + private static class TestStreamTranslator<T> + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TestStream<T>> { + + @Override + void translateNode(TestStream<T> testStream, FlinkStreamingTranslationContext context) { + Coder<T> valueCoder = testStream.getValueCoder(); + + // Coder for the Elements in the TestStream + TestStream.TestStreamCoder<T> testStreamCoder = TestStream.TestStreamCoder.of(valueCoder); + final byte[] payload; + try { + payload = CoderUtils.encodeToByteArray(testStreamCoder, testStream); + } catch (CoderException e) { + throw new RuntimeException("Could not encode TestStream.", e); + } + + WindowedValue.FullWindowedValueCoder<T> elementCoder = + WindowedValue.getFullCoder(valueCoder, GlobalWindow.Coder.INSTANCE); + + DataStreamSource<WindowedValue<T>> source = + context + .getExecutionEnvironment() + .addSource( + new TestStreamSource<>(testStreamCoder, payload), + new CoderTypeInformation<>(elementCoder)); + + context.setOutputDataStream(context.getOutput(testStream), source); + } + } + /** * Wrapper for {@link UnboundedSourceWrapper}, which simplifies output type, namely, removes * {@link ValueWithRecordId}. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestStreamSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestStreamSource.java new file mode 100644 index 0000000..04c6604 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestStreamSource.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; + +import java.util.List; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.joda.time.Instant; + +/** Flink source for executing {@link org.apache.beam.sdk.testing.TestStream}. */ +public class TestStreamSource<T> extends RichSourceFunction<WindowedValue<T>> { + + private final TestStream.TestStreamCoder<T> testStreamCoder; + private final byte[] payload; + + private volatile boolean isRunning = true; + + public TestStreamSource(TestStream.TestStreamCoder<T> testStreamCoder, byte[] payload) { + this.testStreamCoder = testStreamCoder; + this.payload = payload; + } + + @Override + public void run(SourceContext<WindowedValue<T>> ctx) throws CoderException { + TestStream<T> testStream = CoderUtils.decodeFromByteArray(testStreamCoder, payload); + List<TestStream.Event<T>> events = testStream.getEvents(); + + for (int eventId = 0; isRunning && eventId < events.size(); eventId++) { + TestStream.Event<T> event = events.get(eventId); + + synchronized (ctx.getCheckpointLock()) { + if (event instanceof TestStream.ElementEvent) { + for (TimestampedValue<T> element : ((TestStream.ElementEvent<T>) event).getElements()) { + Instant timestamp = element.getTimestamp(); + WindowedValue<T> value = + WindowedValue.of( + element.getValue(), timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING); + ctx.collectWithTimestamp(value, timestamp.getMillis()); + } + } else if (event instanceof TestStream.WatermarkEvent) { + long millis = ((TestStream.WatermarkEvent<T>) event).getWatermark().getMillis(); + ctx.emitWatermark(new Watermark(millis)); + } else if (event instanceof TestStream.ProcessingTimeEvent) { + // There seems to be no clean way to implement this + throw new UnsupportedOperationException( + "Advancing Processing time is not supported by the Flink Runner."); + } else { + throw new IllegalStateException("Unknown event type " + event); + } + } + } + } + + @Override + public void cancel() { + this.isRunning = false; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java index f137b6f..6ab4c0f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java @@ -21,12 +21,21 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.DurationCoder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.PTransform; @@ -39,6 +48,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; import org.joda.time.Duration; import org.joda.time.Instant; @@ -300,4 +310,82 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> { public int hashCode() { return Objects.hash(TestStream.class, getValueCoder(), getEvents()); } + + /** Coder for {@link TestStream}. */ + public static class TestStreamCoder<T> extends StructuredCoder<TestStream<T>> { + + private final TimestampedValue.TimestampedValueCoder<T> elementCoder; + + public static <T> TestStreamCoder<T> of(Coder<T> valueCoder) { + return new TestStreamCoder<>(valueCoder); + } + + private TestStreamCoder(Coder<T> valueCoder) { + this.elementCoder = TimestampedValue.TimestampedValueCoder.of(valueCoder); + } + + @Override + public void encode(TestStream<T> value, OutputStream outStream) throws IOException { + List<Event<T>> events = value.getEvents(); + VarIntCoder.of().encode(events.size(), outStream); + + for (Event event : events) { + if (event instanceof ElementEvent) { + outStream.write(event.getType().ordinal()); + Iterable<TimestampedValue<T>> elements = ((ElementEvent) event).getElements(); + VarIntCoder.of().encode(Iterables.size(elements), outStream); + for (TimestampedValue<T> element : elements) { + elementCoder.encode(element, outStream); + } + } else if (event instanceof WatermarkEvent) { + outStream.write(event.getType().ordinal()); + Instant watermark = ((WatermarkEvent) event).getWatermark(); + InstantCoder.of().encode(watermark, outStream); + } else if (event instanceof ProcessingTimeEvent) { + outStream.write(event.getType().ordinal()); + Duration processingTimeAdvance = ((ProcessingTimeEvent) event).getProcessingTimeAdvance(); + DurationCoder.of().encode(processingTimeAdvance, outStream); + } + } + } + + @Override + public TestStream<T> decode(InputStream inStream) throws IOException { + Integer numberOfEvents = VarIntCoder.of().decode(inStream); + List<Event<T>> events = new ArrayList<>(numberOfEvents); + + for (int i = 0; i < numberOfEvents; i++) { + EventType eventType = EventType.values()[inStream.read()]; + switch (eventType) { + case ELEMENT: + int numElements = VarIntCoder.of().decode(inStream); + List<TimestampedValue<T>> elements = new ArrayList<>(numElements); + for (int j = 0; j < numElements; j++) { + elements.add(elementCoder.decode(inStream)); + } + events.add(ElementEvent.add(elements)); + break; + case WATERMARK: + Instant watermark = InstantCoder.of().decode(inStream); + events.add(WatermarkEvent.advanceTo(watermark)); + break; + case PROCESSING_TIME: + Duration duration = DurationCoder.of().decode(inStream).toDuration(); + events.add(ProcessingTimeEvent.advanceBy(duration)); + break; + default: + throw new IllegalStateException("Unknown event type + " + eventType); + } + } + return TestStream.fromRawEvents(elementCoder.getValueCoder(), events); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.singletonList(elementCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java index c3f6e26..97e7f08 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java @@ -19,6 +19,6 @@ package org.apache.beam.sdk.testing; /** * Category tag for tests that use {@link TestStream}, which is not a part of the Beam model but a - * special feature currently only implemented by the direct runner. + * special feature currently only implemented by the direct runner and the Flink Runner (streaming). */ public interface UsesTestStream {} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStreamWithProcessingTime.java similarity index 74% copy from sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java copy to sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStreamWithProcessingTime.java index c3f6e26..164e9fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStreamWithProcessingTime.java @@ -18,7 +18,8 @@ package org.apache.beam.sdk.testing; /** - * Category tag for tests that use {@link TestStream}, which is not a part of the Beam model but a - * special feature currently only implemented by the direct runner. + * Subcategory for {@link UsesTestStream} tests which use the processing time feature of {@link + * TestStream}. Some Runners do not support setting processing time globally in a way that {@link + * TestStream} demands it. */ -public interface UsesTestStream {} +public interface UsesTestStreamWithProcessingTime extends UsesTestStream {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index e1a32fa..e21acaf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -17,11 +17,11 @@ */ package org.apache.beam.sdk.testing; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.Assert.assertThat; import java.io.Serializable; import java.util.stream.StreamSupport; @@ -48,6 +48,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; @@ -384,4 +385,22 @@ public class TestStreamTest implements Serializable { p.run().waitUntilFinish(); } + + @Test + public void testCoder() throws Exception { + TestStream<String> testStream = + TestStream.create(StringUtf8Coder.of()) + .addElements("hey") + .advanceWatermarkTo(Instant.ofEpochMilli(22521600)) + .advanceProcessingTime(Duration.millis(42)) + .addElements("hey", "joe") + .advanceWatermarkToInfinity(); + + TestStream.TestStreamCoder<String> coder = TestStream.TestStreamCoder.of(StringUtf8Coder.of()); + + byte[] bytes = CoderUtils.encodeToByteArray(coder, testStream); + TestStream<String> recoveredStream = CoderUtils.decodeFromByteArray(coder, bytes); + + assertThat(recoveredStream, is(testStream)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index adb5096..a0e8311 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -53,7 +53,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.testing.UsesTestStream; +import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; @@ -150,7 +150,7 @@ public class GroupByKeyTest implements Serializable { * a spurious output. */ @Test - @Category({ValidatesRunner.class, UsesTestStream.class}) + @Category({ValidatesRunner.class, UsesTestStreamWithProcessingTime.class}) public void testCombiningAccumulatingProcessingTime() throws Exception { PCollection<Integer> triggeredSums = p.apply( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index d552886..84189fc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -85,6 +85,7 @@ import org.apache.beam.sdk.testing.UsesSetState; import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.UsesStatefulParDo; import org.apache.beam.sdk.testing.UsesTestStream; +import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime; import org.apache.beam.sdk.testing.UsesTimersInParDo; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.OnTimer; @@ -3065,7 +3066,11 @@ public class ParDoTest implements Serializable { * timers when a "set" method is called on it before it goes off. */ @Test - @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + @Category({ + ValidatesRunner.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class + }) public void testProcessingTimeTimerCanBeReset() throws Exception { final String timerId = "foo";