[hotfix] [cep] Remove unused keySelector in operator.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34a6020f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34a6020f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34a6020f

Branch: refs/heads/release-1.3
Commit: 34a6020ff6d51cd8148366677fadb9317cfaa153
Parents: 36df901
Author: kkloudas <[email protected]>
Authored: Mon May 15 14:49:00 2017 +0200
Committer: kkloudas <[email protected]>
Committed: Wed May 17 14:40:26 2017 +0200

----------------------------------------------------------------------
 .../cep/operator/AbstractKeyedCEPPatternOperator.java   |  6 ------
 .../org/apache/flink/cep/operator/CEPOperatorUtils.java |  6 ------
 .../flink/cep/operator/KeyedCEPPatternOperator.java     |  4 +---
 .../cep/operator/TimeoutKeyedCEPPatternOperator.java    |  4 +---
 .../flink/cep/operator/CEPFrom12MigrationTest.java      |  6 ------
 .../flink/cep/operator/CEPMigration11to13Test.java      |  2 --
 .../org/apache/flink/cep/operator/CEPOperatorTest.java  | 12 ++++--------
 .../org/apache/flink/cep/operator/CEPRescalingTest.java |  1 -
 8 files changed, 6 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7068bc4..bac21b3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -78,9 +77,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
 
        private final TypeSerializer<IN> inputSerializer;
 
-       // necessary to extract the key from the input elements
-       private final KeySelector<IN, KEY> keySelector;
-
        // necessary to serialize the set of seen keys
        private final TypeSerializer<KEY> keySerializer;
 
@@ -112,14 +108,12 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
        public AbstractKeyedCEPPatternOperator(
                        final TypeSerializer<IN> inputSerializer,
                        final boolean isProcessingTime,
-                       final KeySelector<IN, KEY> keySelector,
                        final TypeSerializer<KEY> keySerializer,
                        final NFACompiler.NFAFactory<IN> nfaFactory,
                        final boolean migratingFromOldKeyedOperator) {
 
                this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
                this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
-               this.keySelector = Preconditions.checkNotNull(keySelector);
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
                this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 08424a4..e7b7e65 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -63,7 +63,6 @@ public class CEPOperatorUtils {
                        // We have to use the KeyedCEPPatternOperator which can 
deal with keyed input streams
                        KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) 
inputStream;
 
-                       KeySelector<T, K> keySelector = 
keyedStream.getKeySelector();
                        TypeSerializer<K> keySerializer = 
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 
                        patternStream = keyedStream.transform(
@@ -72,7 +71,6 @@ public class CEPOperatorUtils {
                                new KeyedCEPPatternOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
-                                       keySelector,
                                        keySerializer,
                                        nfaFactory,
                                        true));
@@ -87,7 +85,6 @@ public class CEPOperatorUtils {
                                new KeyedCEPPatternOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
-                                       keySelector,
                                        keySerializer,
                                        nfaFactory,
                                        false
@@ -127,7 +124,6 @@ public class CEPOperatorUtils {
                        // We have to use the KeyedCEPPatternOperator which can 
deal with keyed input streams
                        KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) 
inputStream;
 
-                       KeySelector<T, K> keySelector = 
keyedStream.getKeySelector();
                        TypeSerializer<K> keySerializer = 
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 
                        patternStream = keyedStream.transform(
@@ -136,7 +132,6 @@ public class CEPOperatorUtils {
                                new TimeoutKeyedCEPPatternOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
-                                       keySelector,
                                        keySerializer,
                                        nfaFactory,
                                        true));
@@ -151,7 +146,6 @@ public class CEPOperatorUtils {
                                new TimeoutKeyedCEPPatternOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
-                                       keySelector,
                                        keySerializer,
                                        nfaFactory,
                                        false

http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 4d68afb..fec226a 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -44,12 +43,11 @@ public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPatternOpe
        public KeyedCEPPatternOperator(
                        TypeSerializer<IN> inputSerializer,
                        boolean isProcessingTime,
-                       KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        NFACompiler.NFAFactory<IN> nfaFactory,
                        boolean migratingFromOldKeyedOperator) {
 
-               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+               super(inputSerializer, isProcessingTime, keySerializer, 
nfaFactory, migratingFromOldKeyedOperator);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 9061bcb..5238878 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
@@ -44,12 +43,11 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> 
extends AbstractKeyedCEPPat
        public TimeoutKeyedCEPPatternOperator(
                        TypeSerializer<IN> inputSerializer,
                        boolean isProcessingTime,
-                       KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        NFACompiler.NFAFactory<IN> nfaFactory,
                        boolean migratingFromOldKeyedOperator) {
 
-               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+               super(inputSerializer, isProcessingTime, keySerializer, 
nfaFactory, migratingFromOldKeyedOperator);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index b0f47cc..d9efb1b 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -78,7 +78,6 @@ public class CEPFrom12MigrationTest {
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
-                                                               keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
                                                                true),
@@ -126,7 +125,6 @@ public class CEPFrom12MigrationTest {
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
-                                                               keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
                                                                true),
@@ -200,7 +198,6 @@ public class CEPFrom12MigrationTest {
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
-                                                               keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
                                                                true),
@@ -246,7 +243,6 @@ public class CEPFrom12MigrationTest {
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
-                                                               keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
                                                                true),
@@ -332,7 +328,6 @@ public class CEPFrom12MigrationTest {
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
-                                                               keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
SinglePatternNFAFactory(),
                                                                true),
@@ -371,7 +366,6 @@ public class CEPFrom12MigrationTest {
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
-                                                               keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
SinglePatternNFAFactory(),
                                                                true),

http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 8a97448..88a5703 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -103,7 +103,6 @@ public class CEPMigration11to13Test {
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
-                                                               keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
                                                                true),
@@ -178,7 +177,6 @@ public class CEPMigration11to13Test {
                                                new KeyedCEPPatternOperator<>(
                                                                
Event.createTypeSerializer(),
                                                                false,
-                                                               keySelector,
                                                                
ByteSerializer.INSTANCE,
                                                                new 
NFAFactory(),
                                                                false),

http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 436ad52..eb50dfd 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -228,7 +228,6 @@ public class CEPOperatorTest extends TestLogger {
                        new TimeoutKeyedCEPPatternOperator<>(
                                Event.createTypeSerializer(),
                                false,
-                               keySelector,
                                IntSerializer.INSTANCE,
                                new NFAFactory(true),
                                true),
@@ -297,7 +296,7 @@ public class CEPOperatorTest extends TestLogger {
                Event startEventK2 = new Event(43, "start", 1.0);
 
                TestKeySelector keySelector = new TestKeySelector();
-               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearator(false, keySelector);
+               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearator(false);
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
 
                harness.open();
@@ -384,7 +383,6 @@ public class CEPOperatorTest extends TestLogger {
                KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
                                Event.createTypeSerializer(),
                                false,
-                               keySelector,
                                IntSerializer.INSTANCE,
                                new ComplexNFAFactory(),
                                true);
@@ -475,7 +473,7 @@ public class CEPOperatorTest extends TestLogger {
                Event startEventK2 = new Event(43, "start", 1.0);
 
                TestKeySelector keySelector = new TestKeySelector();
-               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearator(true, keySelector);
+               KeyedCEPPatternOperator<Event, Integer> operator = 
getKeyedCepOpearator(true);
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
 
                harness.open();
@@ -555,7 +553,7 @@ public class CEPOperatorTest extends TestLogger {
                KeySelector<Event, Integer> keySelector = new TestKeySelector();
 
                return new KeyedOneInputStreamOperatorTestHarness<>(
-                       getKeyedCepOpearator(isProcessingTime, keySelector),
+                       getKeyedCepOpearator(isProcessingTime),
                        keySelector,
                        BasicTypeInfo.INT_TYPE_INFO);
        }
@@ -571,13 +569,11 @@ public class CEPOperatorTest extends TestLogger {
        }
 
        private KeyedCEPPatternOperator<Event, Integer> getKeyedCepOpearator(
-                       boolean isProcessingTime,
-                       KeySelector<Event, Integer> keySelector) {
+                       boolean isProcessingTime) {
 
                return new KeyedCEPPatternOperator<>(
                        Event.createTypeSerializer(),
                        isProcessingTime,
-                       keySelector,
                        IntSerializer.INSTANCE,
                        new NFAFactory(),
                        true);

http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 9eb8da2..45d7215 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -344,7 +344,6 @@ public class CEPRescalingTest {
                        new KeyedCEPPatternOperator<>(
                                Event.createTypeSerializer(),
                                false,
-                               keySelector,
                                
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                new NFAFactory(),
                                true),

Reply via email to