[FLINK-6255] [cep] Remove PatternStream.getSideOutput().

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05ad87f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05ad87f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05ad87f4

Branch: refs/heads/master
Commit: 05ad87f4ce8c0aea6944feb14bf19795c1fc56c9
Parents: 02ea418
Author: kl0u <[email protected]>
Authored: Fri May 12 16:01:38 2017 +0200
Committer: kkloudas <[email protected]>
Committed: Wed May 17 14:37:32 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            | 41 +-------
 .../apache/flink/cep/scala/PatternStream.scala  | 35 +------
 .../org/apache/flink/cep/PatternStream.java     | 55 +----------
 .../AbstractKeyedCEPPatternOperator.java        | 24 -----
 .../flink/cep/operator/CEPOperatorUtils.java    |  9 +-
 .../cep/operator/KeyedCEPPatternOperator.java   |  4 +-
 .../TimeoutKeyedCEPPatternOperator.java         |  4 +-
 .../java/org/apache/flink/cep/CEPITCase.java    | 98 +-------------------
 .../cep/operator/CEPFrom12MigrationTest.java    |  6 --
 .../cep/operator/CEPMigration11to13Test.java    |  2 -
 .../flink/cep/operator/CEPOperatorTest.java     |  2 -
 .../flink/cep/operator/CEPRescalingTest.java    |  1 -
 12 files changed, 13 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index b379615..58e1a0a 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -806,46 +806,7 @@ in event time.
 
 To also guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes 
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last 
-seen watermark. Late elements are not further processed but they can be 
redirected to a [side output]
-({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them.
-
-To access the stream of late elements, you first need to specify that you want 
to get the late data using 
-`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the 
`CEP.pattern(...)` call. If you do not do
-so, the late elements will be silently dropped. Then, you can get the 
side-output stream using the 
-`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and 
providing as argument the output tag used in 
-the `.sideOutputLateData(OutputTag)`:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
-
-PatternStream<T> patternStream = CEP.pattern(...)
-    .sideOutputLateData(lateOutputTag);
-
-// main output with matches
-DataStream<O> result = patternStream.select(...)    
-
-// side output containing the late events
-DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val lateOutputTag = OutputTag[T]("late-data")
-
-val patternStream: PatternStream[T] = CEP.pattern(...)
-    .sideOutputLateData(lateOutputTag)
-
-// main output with matches
-val result = patternStream.select(...)
-
-// side output containing the late events
-val lateStream = patternStream.getSideOutput(lateOutputTag)
-{% endhighlight %}
-</div>
-</div>
+seen watermark. Late elements are not further processed.
 
 ## Examples
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index d4bc28c..e71439c 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, 
PatternStream => JPatternStream}
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
-import org.apache.flink.util.{Collector, OutputTag}
+import org.apache.flink.util.Collector
 import org.apache.flink.types.{Either => FEither}
 import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
 import java.lang.{Long => JLong}
 
-import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.cep.operator.CEPOperatorUtils
 import org.apache.flink.cep.scala.pattern.Pattern
 
@@ -47,23 +46,8 @@ import scala.collection.mutable
   */
 class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
-  private[flink] var lateDataOutputTag: OutputTag[T] = null
-
   private[flink] def wrappedPatternStream = jPatternStream
 
-
-  /**
-    * Send late arriving data to the side output identified by the given 
{@link OutputTag}. The
-    * CEP library assumes correctness of the watermark, so an element is 
considered late if its
-    * timestamp is smaller than the last received watermark.
-    */
-  @PublicEvolving
-  def sideOutputLateData(outputTag: OutputTag[T]): PatternStream[T] = {
-    jPatternStream.sideOutputLateData(outputTag)
-    lateDataOutputTag = outputTag
-    this
-  }
-
   def getPattern: Pattern[T, T] = 
Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
 
   def getInputStream: DataStream[T] = 
asScalaStream(jPatternStream.getInputStream())
@@ -110,8 +94,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
       jPatternStream.getInputStream(),
-      jPatternStream.getPattern(),
-      lateDataOutputTag)
+      jPatternStream.getPattern())
 
     val cleanedSelect = cleanClosure(patternSelectFunction)
     val cleanedTimeout = cleanClosure(patternTimeoutFunction)
@@ -176,8 +159,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   : DataStream[Either[L, R]] = {
     val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
       jPatternStream.getInputStream(),
-      jPatternStream.getPattern(),
-      lateDataOutputTag
+      jPatternStream.getPattern()
     )
 
     val cleanedSelect = cleanClosure(patternFlatSelectFunction)
@@ -338,17 +320,6 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
   }
-
-  /**
-    * Gets the {@link DataStream} that contains the elements that are emitted 
from an operation
-    * into the side output with the given {@link OutputTag}.
-    *
-    * @param tag The tag identifying a specific side output.
-    */
-    @PublicEvolving
-    def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = {
-      asScalaStream(jPatternStream.getSideOutput(tag))
-    }
 }
 
 object PatternStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 04dff49..5544689 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
@@ -54,19 +52,6 @@ public class PatternStream<T> {
 
        private final Pattern<T, ?> pattern;
 
-       /**
-        * A reference to the created pattern stream used to get
-        * the registered side outputs, e.g late elements side output.
-        */
-       private SingleOutputStreamOperator<?> patternStream;
-
-       /**
-        * {@link OutputTag} to use for late arriving events. Elements for which
-        * {@code window.maxTimestamp + allowedLateness} is smaller than the 
current watermark will
-        * be emitted to this.
-        */
-       private OutputTag<T> lateDataOutputTag;
-
        PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> 
pattern) {
                this.inputStream = inputStream;
                this.pattern = pattern;
@@ -81,22 +66,6 @@ public class PatternStream<T> {
        }
 
        /**
-        * Send late arriving data to the side output identified by the given 
{@link OutputTag}. The
-        * CEP library assumes correctness of the watermark, so an element is 
considered late if its
-        * timestamp is smaller than the last received watermark.
-        */
-       public PatternStream<T> sideOutputLateData(OutputTag<T> outputTag) {
-               Preconditions.checkNotNull(outputTag, "Side output tag must not 
be null.");
-               Preconditions.checkArgument(lateDataOutputTag == null,
-                               "The late side output tag has already been 
initialized to " + lateDataOutputTag + ".");
-               Preconditions.checkArgument(patternStream == null,
-                               "The late side output tag has to be set before 
calling select() or flatSelect().");
-
-               this.lateDataOutputTag = 
inputStream.getExecutionEnvironment().clean(outputTag);
-               return this;
-       }
-
-       /**
         * Applies a select function to the detected pattern sequence. For each 
pattern sequence the
         * provided {@link PatternSelectFunction} is called. The pattern select 
function can produce
         * exactly one resulting element.
@@ -137,8 +106,7 @@ public class PatternStream<T> {
         */
        public <R> SingleOutputStreamOperator<R> select(final 
PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> 
outTypeInfo) {
                SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
-                               
CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
-               this.patternStream = patternStream;
+                               
CEPOperatorUtils.createPatternStream(inputStream, pattern);
 
                return patternStream.map(
                        new PatternSelectMapper<>(
@@ -169,8 +137,7 @@ public class PatternStream<T> {
                final PatternSelectFunction<T, R> patternSelectFunction) {
 
                SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, 
Long>, Map<String, List<T>>>> patternStream =
-                               
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, 
lateDataOutputTag);
-               this.patternStream = patternStream;
+                               
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
 
                TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternTimeoutFunction,
@@ -240,8 +207,7 @@ public class PatternStream<T> {
         */
        public <R> SingleOutputStreamOperator<R> flatSelect(final 
PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> 
outTypeInfo) {
                SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
-                               
CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
-               this.patternStream = patternStream;
+                               
CEPOperatorUtils.createPatternStream(inputStream, pattern);
 
                return patternStream.flatMap(
                        new PatternFlatSelectMapper<>(
@@ -273,8 +239,7 @@ public class PatternStream<T> {
                final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction) {
 
                SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, 
Long>, Map<String, List<T>>>> patternStream =
-                               
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, 
lateDataOutputTag);
-               this.patternStream = patternStream;
+                               
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
 
                TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternFlatTimeoutFunction,
@@ -305,18 +270,6 @@ public class PatternStream<T> {
        }
 
        /**
-        * Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
-        * into the side output with the given {@link OutputTag}.
-        *
-        * @param sideOutputTag The tag identifying a specific side output.
-        */
-       public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
-               Preconditions.checkNotNull(patternStream, "The operator has not 
been initialized. " +
-                               "To have the late element side output, you have 
to first define the main output using select() or flatSelect().");
-               return patternStream.getSideOutput(sideOutputTag);
-       }
-
-       /**
         * Wrapper for a {@link PatternSelectFunction}.
         *
         * @param <T> Type of the input elements

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 3afe397..7068bc4 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -47,7 +47,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Migration;
-import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -99,13 +98,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
        private transient InternalTimerService<VoidNamespace> timerService;
 
        /**
-        * {@link OutputTag} to use for late arriving events. Elements for which
-        * {@code window.maxTimestamp + allowedLateness} is smaller than the 
current watermark will
-        * be emitted to this.
-        */
-       private final OutputTag<IN> lateDataOutputTag;
-
-       /**
         * The last seen watermark. This will be used to
         * decide if an incoming element is late or not.
         */
@@ -123,7 +115,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                        final KeySelector<IN, KEY> keySelector,
                        final TypeSerializer<KEY> keySerializer,
                        final NFACompiler.NFAFactory<IN> nfaFactory,
-                       final OutputTag<IN> lateDataOutputTag,
                        final boolean migratingFromOldKeyedOperator) {
 
                this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
@@ -132,7 +123,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
                this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
 
-               this.lateDataOutputTag = lateDataOutputTag;
                this.migratingFromOldKeyedOperator = 
migratingFromOldKeyedOperator;
        }
 
@@ -203,8 +193,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                        priorityQueue.offer(element);
                                }
                                updatePriorityQueue(priorityQueue);
-                       } else {
-                               sideOutputLateElement(element);
                        }
                }
        }
@@ -266,18 +254,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                this.lastWatermark = timestamp;
        }
 
-       /**
-        * Puts the provided late element in the dedicated side output,
-        * if the user has specified one.
-        *
-        * @param element The late element.
-        */
-       private void sideOutputLateElement(StreamRecord<IN> element) {
-               if (lateDataOutputTag != null) {
-                       output.collect(lateDataOutputTag, element);
-               }
-       }
-
        private NFA<IN> getNFA() throws IOException {
                NFA<IN> nfa = nfaOperatorState.value();
                return nfa != null ? nfa : nfaFactory.createNFA();

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 065c244..08424a4 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
 
 import java.util.List;
 import java.util.Map;
@@ -49,7 +48,7 @@ public class CEPOperatorUtils {
         * @return Data stream containing fully matched event sequences stored 
in a {@link Map}. The
         * events are indexed by their associated names of the pattern.
         */
-       public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> 
createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, 
OutputTag<T> lateDataOutputTag) {
+       public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> 
createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
                final TypeSerializer<T> inputSerializer = 
inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
                // check whether we use processing time
@@ -76,7 +75,6 @@ public class CEPOperatorUtils {
                                        keySelector,
                                        keySerializer,
                                        nfaFactory,
-                                       lateDataOutputTag,
                                        true));
                } else {
 
@@ -92,7 +90,6 @@ public class CEPOperatorUtils {
                                        keySelector,
                                        keySerializer,
                                        nfaFactory,
-                                       lateDataOutputTag,
                                        false
                                )).forceNonParallel();
                }
@@ -110,7 +107,7 @@ public class CEPOperatorUtils {
         * a {@link Either} instance.
         */
        public static <K, T> 
SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, 
Map<String, List<T>>>> createTimeoutPatternStream(
-                       DataStream<T> inputStream, Pattern<T, ?> pattern, 
OutputTag<T> lateDataOutputTag) {
+                       DataStream<T> inputStream, Pattern<T, ?> pattern) {
 
                final TypeSerializer<T> inputSerializer = 
inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
@@ -142,7 +139,6 @@ public class CEPOperatorUtils {
                                        keySelector,
                                        keySerializer,
                                        nfaFactory,
-                                       lateDataOutputTag,
                                        true));
                } else {
 
@@ -158,7 +154,6 @@ public class CEPOperatorUtils {
                                        keySelector,
                                        keySerializer,
                                        nfaFactory,
-                                       lateDataOutputTag,
                                        false
                                )).forceNonParallel();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index f48f5c3..4d68afb 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 import java.util.Iterator;
@@ -48,10 +47,9 @@ public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPatternOpe
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        NFACompiler.NFAFactory<IN> nfaFactory,
-                       OutputTag<IN> lateDataOutputTag,
                        boolean migratingFromOldKeyedOperator) {
 
-               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
+               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, migratingFromOldKeyedOperator);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 618a94d..9061bcb 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -25,7 +25,6 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 import java.util.List;
@@ -48,10 +47,9 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPat
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        NFACompiler.NFAFactory<IN> nfaFactory,
-                       OutputTag<IN> lateDataOutputTag,
                        boolean migratingFromOldKeyedOperator) {
 
-               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
+               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, migratingFromOldKeyedOperator);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index a6e925d..9a08659 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
 import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -581,99 +580,4 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
 
                env.execute();
        }
-
-       @Test
-       public void testLateEventSideOutput() throws Exception {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-               env.setParallelism(1);
-
-               // (Event, timestamp)
-               DataStream<Event> input = env.fromElements(
-                               Tuple2.of(new Event(1, "start", 1.0), 1L),
-                               Tuple2.of(new Event(2, "middle", 2.0), 2L),
-                               Tuple2.of(new Event(3, "end", 3.0), 15L),
-                               Tuple2.of(new Event(4, "middle", 5.0), 7L),
-                               Tuple2.of(new Event(6, "start", 1.0), 21L),
-                               Tuple2.of(new Event(5, "middle", 5.0), 10L),
-                               Tuple2.of(new Event(7, "middle", 2.0), 22L),
-                               Tuple2.of(new Event(8, "end", 3.0), 23L)
-               ).assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
-
-                       @Override
-                       public long extractTimestamp(Tuple2<Event, Long> 
element, long previousTimestamp) {
-                               return element.f1;
-                       }
-
-                       @Override
-                       public Watermark checkAndGetNextWatermark(Tuple2<Event, 
Long> lastElement, long extractedTimestamp) {
-                               return lastElement.f0.getName().equals("end") ? 
new Watermark(extractedTimestamp) : null;
-                       }
-
-               }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
-
-                       @Override
-                       public Event map(Tuple2<Event, Long> value) throws 
Exception {
-                               return value.f0;
-                       }
-               });
-
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("start");
-                       }
-               }).followedByAny("middle").where(new SimpleCondition<Event>() {
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("middle");
-                       }
-               }).followedByAny("end").where(new SimpleCondition<Event>() {
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("end");
-                       }
-               });
-
-               final OutputTag<Event> lateOutputTag = new 
OutputTag<Event>("late-data"){};
-
-               PatternStream<Event> patternStream = CEP.pattern(input, 
pattern).sideOutputLateData(lateOutputTag);
-               DataStream<String> result = patternStream.select(
-                               new PatternSelectFunction<Event, String>() {
-
-                                       @Override
-                                       public String select(Map<String, 
List<Event>> pattern) {
-                                               StringBuilder builder = new 
StringBuilder();
-
-                                               
builder.append(pattern.get("start").get(0).getId()).append(",")
-                                                               
.append(pattern.get("middle").get(0).getId()).append(",")
-                                                               
.append(pattern.get("end").get(0).getId());
-                                               return builder.toString();
-                                       }
-                               }
-               );
-
-               DataStream<Event> lateEvents = 
patternStream.getSideOutput(lateOutputTag);
-
-               // we just care for the late events in this test.
-               lateEvents.map(
-                               new MapFunction<Event, Integer>() {
-
-                                       @Override
-                                       public Integer map(Event value) throws 
Exception {
-                                               return value.getId();
-                                       }
-                               }
-               ).writeAsText(lateEventPath, FileSystem.WriteMode.OVERWRITE);
-
-               // the expected sequence of late event ids
-               expectedLateEvents = "4\n5";
-
-               result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-               expected = "1,2,3\n1,2,8\n1,7,8\n6,7,8";
-               env.execute();
-       }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index afb3e7c..789d000 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -81,7 +81,6 @@ public class CEPFrom12MigrationTest {
                                                                keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
-                                                               null,
                                                                true),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
@@ -129,7 +128,6 @@ public class CEPFrom12MigrationTest {
                                                                keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
-                                                               null,
                                                                true),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
@@ -204,7 +202,6 @@ public class CEPFrom12MigrationTest {
                                                                keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
-                                                               null,
                                                                true),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
@@ -250,7 +247,6 @@ public class CEPFrom12MigrationTest {
                                                                keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
-                                                               null,
                                                                true),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
@@ -337,7 +333,6 @@ public class CEPFrom12MigrationTest {
                                                                keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
SinglePatternNFAFactory(),
-                                                               null,
                                                                true),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
@@ -376,7 +371,6 @@ public class CEPFrom12MigrationTest {
                                                                keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
SinglePatternNFAFactory(),
-                                                               null,
                                                                true),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 404de54..e5719c5 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -104,7 +104,6 @@ public class CEPMigration11to13Test {
                                                                keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
-                                                               null,
                                                                true),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
@@ -179,7 +178,6 @@ public class CEPMigration11to13Test {
                                                                keySelector,
                                                                
ByteSerializer.INSTANCE,
                                                                new 
NFAFactory(),
-                                                               null,
                                                                false),
                                                keySelector,
                                                BasicTypeInfo.BYTE_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 5ed8b46..74bddbb 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -227,7 +227,6 @@ public class CEPOperatorTest extends TestLogger {
                                keySelector,
                                IntSerializer.INSTANCE,
                                new NFAFactory(true),
-                               null,
                                true),
                        keySelector,
                        BasicTypeInfo.INT_TYPE_INFO);
@@ -487,7 +486,6 @@ public class CEPOperatorTest extends TestLogger {
                        keySelector,
                        IntSerializer.INSTANCE,
                        new NFAFactory(),
-                       null,
                        true);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 0210ef9..9eb8da2 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -347,7 +347,6 @@ public class CEPRescalingTest {
                                keySelector,
                                
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                new NFAFactory(),
-                               null,
                                true),
                        keySelector,
                        BasicTypeInfo.INT_TYPE_INFO,

Reply via email to