[
https://issues.apache.org/jira/browse/FLINK-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930027#comment-15930027
]
ASF GitHub Bot commented on FLINK-5846:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3445#discussion_r106659109
--- Diff:
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
---
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration11to13Test {
+
+ private static String getResourceFilename(String filename) {
+ ClassLoader cl = CEPMigration11to13Test.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ if (resource == null) {
+ throw new NullPointerException("Missing snapshot
resource.");
+ }
+ return resource.getFile();
+ }
+
+ @Test
+ public void testKeyedCEPOperatorMigratation() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new
KeySelector<Event, Integer>() {
+ private static final long serialVersionUID =
-4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent = new Event(42, "start", 1.0);
+ final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
+ final Event endEvent = new Event(42, "end", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink
1.1
+ /*
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>>
harness = new OneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory()));
+ harness.configureForKeyedStream(keySelector,
BasicTypeInfo.INT_TYPE_INFO);
+ harness.open();
+ harness.processElement(new StreamRecord<Event>(startEvent, 1));
+ harness.processElement(new StreamRecord<Event>(new Event(42,
"foobar", 1.0), 2));
+ harness.processElement(new StreamRecord<Event>(new SubEvent(42,
"barfoo", 1.0, 5.0), 3));
+ harness.processWatermark(new Watermark(2));
+ // simulate snapshot/restore with empty element queue but NFA
state
+ StreamTaskState snapshot = harness.snapshot(1, 1);
+ FileOutputStream out = new FileOutputStream(
--- End diff --
The `-old` seems to be a leftover?
> CEP: make the operators backwards compatible.
> ---------------------------------------------
>
> Key: FLINK-5846
> URL: https://issues.apache.org/jira/browse/FLINK-5846
> Project: Flink
> Issue Type: Sub-task
> Components: CEP
> Affects Versions: 1.3.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> This targets making the new CEP operators compatible with their previous
> versions from Flink 1.1 and Flink 1.2.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)