[FLINK-6830] [cep] Port CEPFrom12MigrationTest for Flink 1.3
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/736e7722 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/736e7722 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/736e7722 Branch: refs/heads/master Commit: 736e7722b2e1a9daab8ae911404c2dad88da596c Parents: 7b3967c Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Sat Jun 3 23:11:31 2017 +0200 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Wed Jun 7 18:28:58 2017 +0200 ---------------------------------------------------------------------- .../cep/operator/CEPFrom12MigrationTest.java | 606 ------------------ .../flink/cep/operator/CEPMigrationTest.java | 635 +++++++++++++++++++ ...-migration-after-branching-flink1.2-snapshot | Bin 5580 -> 5562 bytes ...-migration-after-branching-flink1.3-snapshot | Bin 0 -> 21980 bytes ...-single-pattern-afterwards-flink1.2-snapshot | Bin 2326 -> 2326 bytes ...-single-pattern-afterwards-flink1.3-snapshot | Bin 0 -> 19770 bytes ...ation-starting-new-pattern-flink1.2-snapshot | Bin 5389 -> 5371 bytes ...ation-starting-new-pattern-flink1.3-snapshot | Bin 0 -> 21788 bytes .../util/migration/MigrationTestUtil.java | 50 ++ .../util/migration/MigrationVersion.java | 43 ++ 10 files changed, 728 insertions(+), 606 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java deleted file mode 100644 index f5a909b..0000000 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java +++ /dev/null @@ -1,606 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cep.operator; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.cep.Event; -import org.apache.flink.cep.SubEvent; -import org.apache.flink.cep.nfa.NFA; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.cep.pattern.Pattern; -import org.apache.flink.cep.pattern.conditions.SimpleCondition; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.OperatorSnapshotUtil; - -import org.junit.Ignore; -import org.junit.Test; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Tests for checking whether CEP operator can restore from snapshots that were done - * using the Flink 1.2 operator. - * - * <p>For regenerating the binary snapshot file you have to run the {@code write*()} method on - * the Flink 1.2 branch. - */ - -public class CEPFrom12MigrationTest { - - /** - * Manually run this to write binary snapshot data. - */ - @Ignore - @Test - public void writAfterBranchingPatternSnapshot() throws Exception { - - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - final Event startEvent = new Event(42, "start", 1.0); - final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); - final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); - - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - - try { - harness.setup(); - harness.open(); - - harness.processElement(new StreamRecord<Event>(startEvent, 1)); - harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); - harness - .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); - harness.processElement(new StreamRecord<Event>(middleEvent1, 2)); - harness.processElement(new StreamRecord<Event>(middleEvent2, 3)); - - harness.processWatermark(new Watermark(5)); - - // do snapshot and save to file - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); - OperatorSnapshotUtil.writeStateHandle(snapshot, - "src/test/resources/cep-migration-after-branching-flink1.2-snapshot"); - } finally { - harness.close(); - } - } - - @Test - public void testRestoreAfterBranchingPattern() throws Exception { - - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - final Event startEvent = new Event(42, "start", 1.0); - final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); - final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); - final Event endEvent = new Event(42, "end", 1.0); - - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - - try { - harness.setup(); - harness.initializeState( - OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil - .getResourceFilename("cep-migration-after-branching-flink1.2-snapshot"))); - harness.open(); - - harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); - harness.processElement(new StreamRecord<>(endEvent, 5)); - - harness.processWatermark(new Watermark(20)); - - ConcurrentLinkedQueue<Object> result = harness.getOutput(); - - // watermark and 2 results - assertEquals(3, result.size()); - - Object resultObject1 = result.poll(); - assertTrue(resultObject1 instanceof StreamRecord); - StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1; - assertTrue(resultRecord1.getValue() instanceof Map); - - Object resultObject2 = result.poll(); - assertTrue(resultObject2 instanceof StreamRecord); - StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2; - assertTrue(resultRecord2.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map<String, List<Event>> patternMap1 = - (Map<String, List<Event>>) resultRecord1.getValue(); - - assertEquals(startEvent, patternMap1.get("start").get(0)); - assertEquals(middleEvent1, patternMap1.get("middle").get(0)); - assertEquals(endEvent, patternMap1.get("end").get(0)); - - @SuppressWarnings("unchecked") - Map<String, List<Event>> patternMap2 = - (Map<String, List<Event>>) resultRecord2.getValue(); - - assertEquals(startEvent, patternMap2.get("start").get(0)); - assertEquals(middleEvent2, patternMap2.get("middle").get(0)); - assertEquals(endEvent, patternMap2.get("end").get(0)); - - // and now go for a checkpoint with the new serializers - - final Event startEvent1 = new Event(42, "start", 2.0); - final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0); - final Event endEvent1 = new Event(42, "end", 2.0); - - harness.processElement(new StreamRecord<Event>(startEvent1, 21)); - harness.processElement(new StreamRecord<Event>(middleEvent3, 23)); - - // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(1L, 1L); - harness.close(); - - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - - harness.setup(); - harness.initializeState(snapshot); - harness.open(); - - harness.processElement(new StreamRecord<>(endEvent1, 25)); - - harness.processWatermark(new Watermark(50)); - - result = harness.getOutput(); - - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject3 = result.poll(); - assertTrue(resultObject3 instanceof StreamRecord); - StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3; - assertTrue(resultRecord3.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map<String, List<Event>> patternMap3 = - (Map<String, List<Event>>) resultRecord3.getValue(); - - assertEquals(startEvent1, patternMap3.get("start").get(0)); - assertEquals(middleEvent3, patternMap3.get("middle").get(0)); - assertEquals(endEvent1, patternMap3.get("end").get(0)); - } finally { - harness.close(); - } - } - - /** - * Manually run this to write binary snapshot data. - */ - @Ignore - @Test - public void writeStartingNewPatternAfterMigrationSnapshot() throws Exception { - - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - final Event startEvent1 = new Event(42, "start", 1.0); - final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); - - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - - try { - harness.setup(); - harness.open(); - harness.processElement(new StreamRecord<Event>(startEvent1, 1)); - harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); - harness - .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); - harness.processElement(new StreamRecord<Event>(middleEvent1, 2)); - harness.processWatermark(new Watermark(5)); - - // do snapshot and save to file - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); - OperatorSnapshotUtil.writeStateHandle(snapshot, - "src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot"); - } finally { - harness.close(); - } - } - - @Test - public void testRestoreStartingNewPatternAfterMigration() throws Exception { - - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - final Event startEvent1 = new Event(42, "start", 1.0); - final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); - final Event startEvent2 = new Event(42, "start", 5.0); - final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); - final Event endEvent = new Event(42, "end", 1.0); - - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - - try { - harness.setup(); - harness.initializeState( - OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil.getResourceFilename( - "cep-migration-starting-new-pattern-flink1.2-snapshot"))); - harness.open(); - - harness.processElement(new StreamRecord<>(startEvent2, 5)); - harness.processElement(new StreamRecord<Event>(middleEvent2, 6)); - harness.processElement(new StreamRecord<>(endEvent, 7)); - - harness.processWatermark(new Watermark(20)); - - ConcurrentLinkedQueue<Object> result = harness.getOutput(); - - // watermark and 3 results - assertEquals(4, result.size()); - - Object resultObject1 = result.poll(); - assertTrue(resultObject1 instanceof StreamRecord); - StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1; - assertTrue(resultRecord1.getValue() instanceof Map); - - Object resultObject2 = result.poll(); - assertTrue(resultObject2 instanceof StreamRecord); - StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2; - assertTrue(resultRecord2.getValue() instanceof Map); - - Object resultObject3 = result.poll(); - assertTrue(resultObject3 instanceof StreamRecord); - StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3; - assertTrue(resultRecord3.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map<String, List<Event>> patternMap1 = - (Map<String, List<Event>>) resultRecord1.getValue(); - - assertEquals(startEvent1, patternMap1.get("start").get(0)); - assertEquals(middleEvent1, patternMap1.get("middle").get(0)); - assertEquals(endEvent, patternMap1.get("end").get(0)); - - @SuppressWarnings("unchecked") - Map<String, List<Event>> patternMap2 = - (Map<String, List<Event>>) resultRecord2.getValue(); - - assertEquals(startEvent1, patternMap2.get("start").get(0)); - assertEquals(middleEvent2, patternMap2.get("middle").get(0)); - assertEquals(endEvent, patternMap2.get("end").get(0)); - - @SuppressWarnings("unchecked") - Map<String, List<Event>> patternMap3 = - (Map<String, List<Event>>) resultRecord3.getValue(); - - assertEquals(startEvent2, patternMap3.get("start").get(0)); - assertEquals(middleEvent2, patternMap3.get("middle").get(0)); - assertEquals(endEvent, patternMap3.get("end").get(0)); - - // and now go for a checkpoint with the new serializers - - final Event startEvent3 = new Event(42, "start", 2.0); - final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0); - final Event endEvent1 = new Event(42, "end", 2.0); - - harness.processElement(new StreamRecord<Event>(startEvent3, 21)); - harness.processElement(new StreamRecord<Event>(middleEvent3, 23)); - - // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(1L, 1L); - harness.close(); - - harness = new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new NFAFactory(), - true), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - - harness.setup(); - harness.initializeState(snapshot); - harness.open(); - - harness.processElement(new StreamRecord<>(endEvent1, 25)); - - harness.processWatermark(new Watermark(50)); - - result = harness.getOutput(); - - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject4 = result.poll(); - assertTrue(resultObject4 instanceof StreamRecord); - StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4; - assertTrue(resultRecord4.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map<String, List<Event>> patternMap4 = - (Map<String, List<Event>>) resultRecord4.getValue(); - - assertEquals(startEvent3, patternMap4.get("start").get(0)); - assertEquals(middleEvent3, patternMap4.get("middle").get(0)); - assertEquals(endEvent1, patternMap4.get("end").get(0)); - } finally { - harness.close(); - } - } - - /** - * Manually run this to write binary snapshot data. - */ - @Ignore - @Test - public void writeSinglePatternAfterMigrationSnapshot() throws Exception { - - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - final Event startEvent1 = new Event(42, "start", 1.0); - - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new SinglePatternNFAFactory(), - true), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - - try { - harness.setup(); - harness.open(); - harness.processWatermark(new Watermark(5)); - - // do snapshot and save to file - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); - OperatorSnapshotUtil.writeStateHandle(snapshot, - "src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot"); - } finally { - harness.close(); - } - } - - @Test - public void testSinglePatternAfterMigration() throws Exception { - - KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { - private static final long serialVersionUID = -4873366487571254798L; - - @Override - public Integer getKey(Event value) throws Exception { - return value.getId(); - } - }; - - final Event startEvent1 = new Event(42, "start", 1.0); - - OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = - new KeyedOneInputStreamOperatorTestHarness<>( - new KeyedCEPPatternOperator<>( - Event.createTypeSerializer(), - false, - IntSerializer.INSTANCE, - new SinglePatternNFAFactory(), - true), - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - - try { - harness.setup(); - harness.initializeState( - OperatorSnapshotUtil.readStateHandle( - OperatorSnapshotUtil.getResourceFilename( - "cep-migration-single-pattern-afterwards-flink1.2-snapshot"))); - harness.open(); - - harness.processElement(new StreamRecord<>(startEvent1, 5)); - - harness.processWatermark(new Watermark(20)); - - ConcurrentLinkedQueue<Object> result = harness.getOutput(); - - // watermark and the result - assertEquals(2, result.size()); - - Object resultObject = result.poll(); - assertTrue(resultObject instanceof StreamRecord); - StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; - assertTrue(resultRecord.getValue() instanceof Map); - - @SuppressWarnings("unchecked") - Map<String, List<Event>> patternMap = - (Map<String, List<Event>>) resultRecord.getValue(); - - assertEquals(startEvent1, patternMap.get("start").get(0)); - } finally { - harness.close(); - } - } - - private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> { - - private static final long serialVersionUID = 1173020762472766713L; - - private final boolean handleTimeout; - - private SinglePatternNFAFactory() { - this(false); - } - - private SinglePatternNFAFactory(boolean handleTimeout) { - this.handleTimeout = handleTimeout; - } - - @Override - public NFA<Event> createNFA() { - - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter()) - .within(Time.milliseconds(10L)); - - return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); - } - } - - private static class NFAFactory implements NFACompiler.NFAFactory<Event> { - - private static final long serialVersionUID = 1173020762472766713L; - - private final boolean handleTimeout; - - private NFAFactory() { - this(false); - } - - private NFAFactory(boolean handleTimeout) { - this.handleTimeout = handleTimeout; - } - - @Override - public NFA<Event> createNFA() { - - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter()) - .followedBy("middle") - .subtype(SubEvent.class) - .where(new MiddleFilter()) - .followedBy("end") - .where(new EndFilter()) - // add a window timeout to test whether timestamps of elements in the - // priority queue in CEP operator are correctly checkpointed/restored - .within(Time.milliseconds(10L)); - - return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); - } - } - - private static class StartFilter extends SimpleCondition<Event> { - private static final long serialVersionUID = 5726188262756267490L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("start"); - } - } - - private static class MiddleFilter extends SimpleCondition<SubEvent> { - private static final long serialVersionUID = 6215754202506583964L; - - @Override - public boolean filter(SubEvent value) throws Exception { - return value.getVolume() > 5.0; - } - } - - private static class EndFilter extends SimpleCondition<Event> { - private static final long serialVersionUID = 7056763917392056548L; - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java new file mode 100644 index 0000000..2682d76 --- /dev/null +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.streaming.util.migration.MigrationTestUtil; +import org.apache.flink.streaming.util.migration.MigrationVersion; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for checking whether CEP operator can restore from snapshots that were done + * using previous Flink versions. + * + * <p>For regenerating the binary snapshot file of previous versions you have to run the + * {@code write*()} method on the corresponding Flink release-* branch. + */ +@RunWith(Parameterized.class) +public class CEPMigrationTest { + + /** + * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3) + * TODO and remove all @Ignore annotations on write*Snapshot() methods to generate savepoints + */ + private final MigrationVersion flinkGenerateSavepointVersion = null; + + private final MigrationVersion migrateVersion; + + @Parameterized.Parameters(name = "Migration Savepoint: {0}") + public static Collection<MigrationVersion> parameters () { + return Arrays.asList(MigrationVersion.v1_2, MigrationVersion.v1_3); + } + + public CEPMigrationTest(MigrationVersion migrateVersion) { + this.migrateVersion = migrateVersion; + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeAfterBranchingPatternSnapshot() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + try { + harness.setup(); + harness.open(); + + harness.processElement(new StreamRecord<Event>(startEvent, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness + .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processElement(new StreamRecord<Event>(middleEvent1, 2)); + harness.processElement(new StreamRecord<Event>(middleEvent2, 3)); + + harness.processWatermark(new Watermark(5)); + + // do snapshot and save to file + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSnapshotUtil.writeStateHandle(snapshot, + "src/test/resources/cep-migration-after-branching-flink" + flinkGenerateSavepointVersion + "-snapshot"); + } finally { + harness.close(); + } + } + + @Test + public void testRestoreAfterBranchingPattern() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + try { + harness.setup(); + + MigrationTestUtil.restoreFromSnapshot( + harness, + OperatorSnapshotUtil.getResourceFilename("cep-migration-after-branching-flink" + migrateVersion + "-snapshot"), + migrateVersion); + + harness.open(); + + harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); + harness.processElement(new StreamRecord<>(endEvent, 5)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and 2 results + assertEquals(3, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + Object resultObject2 = result.poll(); + assertTrue(resultObject2 instanceof StreamRecord); + StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2; + assertTrue(resultRecord2.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap1 = + (Map<String, List<Event>>) resultRecord1.getValue(); + + assertEquals(startEvent, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent, patternMap1.get("end").get(0)); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap2 = + (Map<String, List<Event>>) resultRecord2.getValue(); + + assertEquals(startEvent, patternMap2.get("start").get(0)); + assertEquals(middleEvent2, patternMap2.get("middle").get(0)); + assertEquals(endEvent, patternMap2.get("end").get(0)); + + // and now go for a checkpoint with the new serializers + + final Event startEvent1 = new Event(42, "start", 2.0); + final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0); + final Event endEvent1 = new Event(42, "end", 2.0); + + harness.processElement(new StreamRecord<Event>(startEvent1, 21)); + harness.processElement(new StreamRecord<Event>(middleEvent3, 23)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + harness.close(); + + harness = new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(endEvent1, 25)); + + harness.processWatermark(new Watermark(50)); + + result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject3 = result.poll(); + assertTrue(resultObject3 instanceof StreamRecord); + StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3; + assertTrue(resultRecord3.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap3 = + (Map<String, List<Event>>) resultRecord3.getValue(); + + assertEquals(startEvent1, patternMap3.get("start").get(0)); + assertEquals(middleEvent3, patternMap3.get("middle").get(0)); + assertEquals(endEvent1, patternMap3.get("end").get(0)); + } finally { + harness.close(); + } + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeStartingNewPatternAfterMigrationSnapshot() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + try { + harness.setup(); + harness.open(); + harness.processElement(new StreamRecord<Event>(startEvent1, 1)); + harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2)); + harness + .processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + harness.processElement(new StreamRecord<Event>(middleEvent1, 2)); + harness.processWatermark(new Watermark(5)); + + // do snapshot and save to file + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSnapshotUtil.writeStateHandle(snapshot, + "src/test/resources/cep-migration-starting-new-pattern-flink" + flinkGenerateSavepointVersion + "-snapshot"); + } finally { + harness.close(); + } + } + + @Test + public void testRestoreStartingNewPatternAfterMigration() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0); + final Event startEvent2 = new Event(42, "start", 5.0); + final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0); + final Event endEvent = new Event(42, "end", 1.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + try { + harness.setup(); + + MigrationTestUtil.restoreFromSnapshot( + harness, + OperatorSnapshotUtil.getResourceFilename("cep-migration-starting-new-pattern-flink" + migrateVersion + "-snapshot"), + migrateVersion); + + harness.open(); + + harness.processElement(new StreamRecord<>(startEvent2, 5)); + harness.processElement(new StreamRecord<Event>(middleEvent2, 6)); + harness.processElement(new StreamRecord<>(endEvent, 7)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and 3 results + assertEquals(4, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + Object resultObject2 = result.poll(); + assertTrue(resultObject2 instanceof StreamRecord); + StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2; + assertTrue(resultRecord2.getValue() instanceof Map); + + Object resultObject3 = result.poll(); + assertTrue(resultObject3 instanceof StreamRecord); + StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3; + assertTrue(resultRecord3.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap1 = + (Map<String, List<Event>>) resultRecord1.getValue(); + + assertEquals(startEvent1, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent, patternMap1.get("end").get(0)); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap2 = + (Map<String, List<Event>>) resultRecord2.getValue(); + + assertEquals(startEvent1, patternMap2.get("start").get(0)); + assertEquals(middleEvent2, patternMap2.get("middle").get(0)); + assertEquals(endEvent, patternMap2.get("end").get(0)); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap3 = + (Map<String, List<Event>>) resultRecord3.getValue(); + + assertEquals(startEvent2, patternMap3.get("start").get(0)); + assertEquals(middleEvent2, patternMap3.get("middle").get(0)); + assertEquals(endEvent, patternMap3.get("end").get(0)); + + // and now go for a checkpoint with the new serializers + + final Event startEvent3 = new Event(42, "start", 2.0); + final SubEvent middleEvent3 = new SubEvent(42, "foo", 1.0, 11.0); + final Event endEvent1 = new Event(42, "end", 2.0); + + harness.processElement(new StreamRecord<Event>(startEvent3, 21)); + harness.processElement(new StreamRecord<Event>(middleEvent3, 23)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + harness.close(); + + harness = new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(endEvent1, 25)); + + harness.processWatermark(new Watermark(50)); + + result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject4 = result.poll(); + assertTrue(resultObject4 instanceof StreamRecord); + StreamRecord<?> resultRecord4 = (StreamRecord<?>) resultObject4; + assertTrue(resultRecord4.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap4 = + (Map<String, List<Event>>) resultRecord4.getValue(); + + assertEquals(startEvent3, patternMap4.get("start").get(0)); + assertEquals(middleEvent3, patternMap4.get("middle").get(0)); + assertEquals(endEvent1, patternMap4.get("end").get(0)); + } finally { + harness.close(); + } + } + + /** + * Manually run this to write binary snapshot data. + */ + @Ignore + @Test + public void writeSinglePatternAfterMigrationSnapshot() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new SinglePatternNFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + try { + harness.setup(); + harness.open(); + harness.processWatermark(new Watermark(5)); + + // do snapshot and save to file + OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSnapshotUtil.writeStateHandle(snapshot, + "src/test/resources/cep-migration-single-pattern-afterwards-flink" + flinkGenerateSavepointVersion + "-snapshot"); + } finally { + harness.close(); + } + } + + @Test + public void testSinglePatternAfterMigration() throws Exception { + + KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + final Event startEvent1 = new Event(42, "start", 1.0); + + OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = + new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new SinglePatternNFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + try { + harness.setup(); + + MigrationTestUtil.restoreFromSnapshot( + harness, + OperatorSnapshotUtil.getResourceFilename("cep-migration-single-pattern-afterwards-flink" + migrateVersion + "-snapshot"), + migrateVersion); + + harness.open(); + + harness.processElement(new StreamRecord<>(startEvent1, 5)); + + harness.processWatermark(new Watermark(20)); + + ConcurrentLinkedQueue<Object> result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject = result.poll(); + assertTrue(resultObject instanceof StreamRecord); + StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject; + assertTrue(resultRecord.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<String, List<Event>> patternMap = + (Map<String, List<Event>>) resultRecord.getValue(); + + assertEquals(startEvent1, patternMap.get("start").get(0)); + } finally { + harness.close(); + } + } + + private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private SinglePatternNFAFactory() { + this(false); + } + + private SinglePatternNFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA<Event> createNFA() { + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter()) + .within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } + + private static class NFAFactory implements NFACompiler.NFAFactory<Event> { + + private static final long serialVersionUID = 1173020762472766713L; + + private final boolean handleTimeout; + + private NFAFactory() { + this(false); + } + + private NFAFactory(boolean handleTimeout) { + this.handleTimeout = handleTimeout; + } + + @Override + public NFA<Event> createNFA() { + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter()) + .followedByAny("middle") + .subtype(SubEvent.class) + .where(new MiddleFilter()) + .followedByAny("end") + .where(new EndFilter()) + // add a window timeout to test whether timestamps of elements in the + // priority queue in CEP operator are correctly checkpointed/restored + .within(Time.milliseconds(10L)); + + return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout); + } + } + + private static class StartFilter extends SimpleCondition<Event> { + private static final long serialVersionUID = 5726188262756267490L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + } + + private static class MiddleFilter extends SimpleCondition<SubEvent> { + private static final long serialVersionUID = 6215754202506583964L; + + @Override + public boolean filter(SubEvent value) throws Exception { + return value.getVolume() > 5.0; + } + } + + private static class EndFilter extends SimpleCondition<Event> { + private static final long serialVersionUID = 7056763917392056548L; + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot index 6775f2a..4eb4e44 100644 Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot and b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.2-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.3-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.3-snapshot new file mode 100644 index 0000000..bee7aeb Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.3-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot index f63b7dd..64e973e 100644 Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot and b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.2-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.3-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.3-snapshot new file mode 100644 index 0000000..faa0bf9 Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.3-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot index 8e0fd27..5e48241 100644 Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot and b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.2-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.3-snapshot ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.3-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.3-snapshot new file mode 100644 index 0000000..1a2f388 Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.3-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java new file mode 100644 index 0000000..f723b34 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationTestUtil.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.migration; + +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; + +/** + * Utility methods for testing snapshot migrations. + */ +public class MigrationTestUtil { + + /** + * Restore from a snapshot taken with an older Flink version. + * + * @param testHarness the test harness to restore the snapshot to. + * @param snapshotPath the absolute path to the snapshot. + * @param snapshotFlinkVersion the Flink version of the snapshot. + * + * @throws Exception + */ + public static void restoreFromSnapshot( + AbstractStreamOperatorTestHarness<?> testHarness, + String snapshotPath, + MigrationVersion snapshotFlinkVersion) throws Exception { + + if (snapshotFlinkVersion == MigrationVersion.v1_1) { + // Flink 1.1 snapshots should be read using the legacy restore method + testHarness.initializeStateFromLegacyCheckpoint(snapshotPath); + } else { + testHarness.initializeState(OperatorSnapshotUtil.readStateHandle(snapshotPath)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/736e7722/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java new file mode 100644 index 0000000..3e7998d --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/migration/MigrationVersion.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.migration; + +/** + * Enumeration for Flink versions, used in migration integration tests + * to indicate the migrated snapshot version. + */ +public enum MigrationVersion { + + // NOTE: the version strings must not change, + // as they are used to locate snapshot file paths + v1_1("1.1"), + v1_2("1.2"), + v1_3("1.3"); + + private String versionStr; + + MigrationVersion(String versionStr) { + this.versionStr = versionStr; + } + + @Override + public String toString() { + return versionStr; + } +}