[FLINK-6808] Implement snapshotConfiguration/ensureCompatibility for CoGroupedStreams.UnionSerializer
This closes #4052. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/539787b2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/539787b2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/539787b2 Branch: refs/heads/master Commit: 539787b21822eb839d0408a989cd541450bd08d2 Parents: 4895472 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Fri Jun 2 15:15:32 2017 +0200 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Wed Jun 7 18:28:58 2017 +0200 ---------------------------------------------------------------------- .../api/datastream/CoGroupedStreams.java | 60 ++++++++++++++++++-- flink-tests/pom.xml | 1 + .../streaming/runtime/CoGroupJoinITCase.java | 47 +++++++++++++++ 3 files changed, 103 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/539787b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index d112260..8dad1cb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -25,10 +25,15 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -77,7 +82,7 @@ public class CoGroupedStreams<T1, T2> { private final DataStream<T2> input2; /** - * Creates new CoGroped data streams, which are the first step towards building a streaming + * Creates new CoGrouped data streams, which are the first step towards building a streaming * co-group. * * @param input1 The first data stream. @@ -443,8 +448,7 @@ public class CoGroupedStreams<T1, T2> { private final TypeSerializer<T1> oneSerializer; private final TypeSerializer<T2> twoSerializer; - public UnionSerializer(TypeSerializer<T1> oneSerializer, - TypeSerializer<T2> twoSerializer) { + public UnionSerializer(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) { this.oneSerializer = oneSerializer; this.twoSerializer = twoSerializer; } @@ -553,12 +557,58 @@ public class CoGroupedStreams<T1, T2> { @Override public TypeSerializerConfigSnapshot snapshotConfiguration() { - throw new UnsupportedOperationException("This serializer is not registered for managed state."); + return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer); } @Override public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - throw new UnsupportedOperationException("This serializer is not registered for managed state."); + if (configSnapshot instanceof UnionSerializerConfigSnapshot) { + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs = + ((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); + + CompatibilityResult<T1> oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( + previousSerializersAndConfigs.get(0).f0, + UnloadableDummyTypeSerializer.class, + previousSerializersAndConfigs.get(0).f1, + oneSerializer); + + CompatibilityResult<T2> twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( + previousSerializersAndConfigs.get(1).f0, + UnloadableDummyTypeSerializer.class, + previousSerializersAndConfigs.get(1).f1, + twoSerializer); + + if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) { + return CompatibilityResult.compatible(); + } else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new UnionSerializer<>( + new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()), + new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer()))); + } + } + + return CompatibilityResult.requiresMigration(); + } + } + + /** + * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}. + */ + public static class UnionSerializerConfigSnapshot<T1, T2> extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public UnionSerializerConfigSnapshot() {} + + public UnionSerializerConfigSnapshot(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) { + super(oneSerializer, twoSerializer); + } + + @Override + public int getVersion() { + return VERSION; } } http://git-wip-us.apache.org/repos/asf/flink/blob/539787b2/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 3c0b184..dd6e949 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -115,6 +115,7 @@ under the License. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> + <type>test-jar</type> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/539787b2/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java index da3de3d..a82b965 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java @@ -19,18 +19,24 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -324,6 +330,47 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { Assert.assertEquals(expectedResult, testResults); } + /** + * Verifies that pipelines including {@link CoGroupedStreams} can be checkpointed properly, + * which includes snapshotting configurations of any involved serializers. + * + * @see <a href="https://issues.apache.org/jira/browse/FLINK-6808">FLINK-6808</a> + */ + @Test + public void testCoGroupOperatorWithCheckpoint() throws Exception { + + // generate an operator for the co-group operation + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("a", 0), Tuple2.of("b", 3)); + DataStream<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 6)); + + DataStream<String> coGroupWindow = source1.coGroup(source2) + .where(new Tuple2KeyExtractor()) + .equalTo(new Tuple2KeyExtractor()) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() { + @Override + public void coGroup(Iterable<Tuple2<String, Integer>> first, + Iterable<Tuple2<String, Integer>> second, + Collector<String> out) throws Exception { + out.collect(first + ":" + second); + } + }); + + OneInputTransformation<Tuple2<String, Integer>, String> transform = (OneInputTransformation<Tuple2<String, Integer>, String>) coGroupWindow.getTransformation(); + OneInputStreamOperator<Tuple2<String, Integer>, String> operator = transform.getOperator(); + + // wrap the operator in the test harness, and perform a snapshot + OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new Tuple2KeyExtractor(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + testHarness.snapshot(0L, 0L); + } + private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> { @Override