[FLINK-3201] Adjust CEP code for changes state interfaces
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1198664c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1198664c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1198664c Branch: refs/heads/master Commit: 1198664cb35f377bbda3d9846cf6c35d232d813a Parents: 180cd3f Author: Stephan Ewen <[email protected]> Authored: Wed Feb 3 17:44:37 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 3 20:28:41 2016 +0100 ---------------------------------------------------------------------- .../flink/cep/operator/CEPPatternOperator.java | 6 ++-- .../cep/operator/KeyedCEPPatternOperator.java | 33 +++++++++++--------- 2 files changed, 21 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1198664c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java index b11a6ab..153c9c9 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -93,12 +93,12 @@ public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN> { public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); - final StateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream( + final AbstractStateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream( checkpointId, timestamp); final ObjectOutputStream oos = new ObjectOutputStream(os); - final StateBackend.CheckpointStateOutputView ov = new StateBackend.CheckpointStateOutputView(os); + final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os); oos.writeObject(nfa); http://git-wip-us.apache.org/repos/asf/flink/blob/1198664c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java index 03758c7..5d754ce 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java @@ -18,7 +18,8 @@ package org.apache.flink.cep.operator; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; @@ -26,7 +27,7 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -70,8 +71,8 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator // TODO: fix once the state refactoring is completed private transient Set<KEY> keys; - private transient OperatorState<NFA<IN>> nfaOperatorState; - private transient OperatorState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState; + private transient ValueState<NFA<IN>> nfaOperatorState; + private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState; public KeyedCEPPatternOperator( TypeSerializer<IN> inputSerializer, @@ -95,19 +96,21 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator } if (nfaOperatorState == null) { - nfaOperatorState = this.createKeyValueState( - NFA_OPERATOR_STATE_NAME, - new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, getExecutionConfig()), - null); + nfaOperatorState = getPartitionedState( + new ValueStateDescriptor<NFA<IN>>( + NFA_OPERATOR_STATE_NAME, + new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, getExecutionConfig()), + null)); } if (priorityQueueOperatorState == null) { - priorityQueueOperatorState = this.createKeyValueState( - PRIORIRY_QUEUE_STATE_NAME, - new PriorityQueueSerializer<StreamRecord<IN>>( - new StreamRecordSerializer<IN>(getInputSerializer()), - new PriorityQueueStreamRecordFactory<IN>()), - null); + priorityQueueOperatorState = getPartitionedState( + new ValueStateDescriptor<PriorityQueue<StreamRecord<IN>>>( + PRIORIRY_QUEUE_STATE_NAME, + new PriorityQueueSerializer<StreamRecord<IN>>( + new StreamRecordSerializer<IN>(getInputSerializer()), + new PriorityQueueStreamRecordFactory<IN>()), + null)); } } @@ -166,7 +169,7 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); - StateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + AbstractStateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); ov.writeInt(keys.size());
