[FLINK-3201] Adjust CEP code for changes state interfaces

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1198664c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1198664c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1198664c

Branch: refs/heads/master
Commit: 1198664cb35f377bbda3d9846cf6c35d232d813a
Parents: 180cd3f
Author: Stephan Ewen <[email protected]>
Authored: Wed Feb 3 17:44:37 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 3 20:28:41 2016 +0100

----------------------------------------------------------------------
 .../flink/cep/operator/CEPPatternOperator.java  |  6 ++--
 .../cep/operator/KeyedCEPPatternOperator.java   | 33 +++++++++++---------
 2 files changed, 21 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1198664c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
index b11a6ab..153c9c9 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -93,12 +93,12 @@ public class CEPPatternOperator<IN> extends 
AbstractCEPPatternOperator<IN> {
        public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
                StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
 
-               final StateBackend.CheckpointStateOutputStream os = 
this.getStateBackend().createCheckpointStateOutputStream(
+               final AbstractStateBackend.CheckpointStateOutputStream os = 
this.getStateBackend().createCheckpointStateOutputStream(
                        checkpointId,
                        timestamp);
 
                final ObjectOutputStream oos = new ObjectOutputStream(os);
-               final StateBackend.CheckpointStateOutputView ov = new 
StateBackend.CheckpointStateOutputView(os);
+               final AbstractStateBackend.CheckpointStateOutputView ov = new 
AbstractStateBackend.CheckpointStateOutputView(os);
 
                oos.writeObject(nfa);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1198664c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 03758c7..5d754ce 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.cep.operator;
 
-import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
@@ -26,7 +27,7 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -70,8 +71,8 @@ public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractCEPPatternOperator
        // TODO: fix once the state refactoring is completed
        private transient Set<KEY> keys;
 
-       private transient OperatorState<NFA<IN>> nfaOperatorState;
-       private transient OperatorState<PriorityQueue<StreamRecord<IN>>> 
priorityQueueOperatorState;
+       private transient ValueState<NFA<IN>> nfaOperatorState;
+       private transient ValueState<PriorityQueue<StreamRecord<IN>>> 
priorityQueueOperatorState;
 
        public KeyedCEPPatternOperator(
                        TypeSerializer<IN> inputSerializer,
@@ -95,19 +96,21 @@ public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractCEPPatternOperator
                }
 
                if (nfaOperatorState == null) {
-                       nfaOperatorState = this.createKeyValueState(
-                               NFA_OPERATOR_STATE_NAME,
-                               new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) 
(Class<?>) NFA.class, getExecutionConfig()),
-                               null);
+                       nfaOperatorState = getPartitionedState(
+                                       new ValueStateDescriptor<NFA<IN>>(
+                                               NFA_OPERATOR_STATE_NAME,
+                                               new 
KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, 
getExecutionConfig()),
+                                               null));
                }
 
                if (priorityQueueOperatorState == null) {
-                       priorityQueueOperatorState = this.createKeyValueState(
-                               PRIORIRY_QUEUE_STATE_NAME,
-                               new PriorityQueueSerializer<StreamRecord<IN>>(
-                                       new 
StreamRecordSerializer<IN>(getInputSerializer()),
-                                       new 
PriorityQueueStreamRecordFactory<IN>()),
-                               null);
+                       priorityQueueOperatorState = getPartitionedState(
+                                       new 
ValueStateDescriptor<PriorityQueue<StreamRecord<IN>>>(
+                                               PRIORIRY_QUEUE_STATE_NAME,
+                                               new 
PriorityQueueSerializer<StreamRecord<IN>>(
+                                                       new 
StreamRecordSerializer<IN>(getInputSerializer()),
+                                                       new 
PriorityQueueStreamRecordFactory<IN>()),
+                                               null));
                }
        }
 
@@ -166,7 +169,7 @@ public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractCEPPatternOperator
        public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
                StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
 
-               StateBackend.CheckpointStateOutputView ov = 
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+               AbstractStateBackend.CheckpointStateOutputView ov = 
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
 
                ov.writeInt(keys.size());
 

Reply via email to