[FLINK-6255] [cep] Remove PatternStream.getSideOutput().
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4560d56c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4560d56c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4560d56c Branch: refs/heads/release-1.3 Commit: 4560d56c6fdface683116dc2db5d7a4942a8b6e3 Parents: 4f14e53 Author: kl0u <[email protected]> Authored: Fri May 12 16:01:38 2017 +0200 Committer: kkloudas <[email protected]> Committed: Wed May 17 14:40:25 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 41 +------- .../apache/flink/cep/scala/PatternStream.scala | 35 +------ .../org/apache/flink/cep/PatternStream.java | 55 +---------- .../AbstractKeyedCEPPatternOperator.java | 24 ----- .../flink/cep/operator/CEPOperatorUtils.java | 9 +- .../cep/operator/KeyedCEPPatternOperator.java | 4 +- .../TimeoutKeyedCEPPatternOperator.java | 4 +- .../java/org/apache/flink/cep/CEPITCase.java | 98 +------------------- .../cep/operator/CEPFrom12MigrationTest.java | 6 -- .../cep/operator/CEPMigration11to13Test.java | 2 - .../flink/cep/operator/CEPOperatorTest.java | 2 - .../flink/cep/operator/CEPRescalingTest.java | 1 - 12 files changed, 13 insertions(+), 268 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index b379615..58e1a0a 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -806,46 +806,7 @@ in event time. To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed but they can be redirected to a [side output] -({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them. - -To access the stream of late elements, you first need to specify that you want to get the late data using -`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do -so, the late elements will be silently dropped. Then, you can get the side-output stream using the -`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in -the `.sideOutputLateData(OutputTag)`: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; - -PatternStream<T> patternStream = CEP.pattern(...) - .sideOutputLateData(lateOutputTag); - -// main output with matches -DataStream<O> result = patternStream.select(...) - -// side output containing the late events -DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag); -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val lateOutputTag = OutputTag[T]("late-data") - -val patternStream: PatternStream[T] = CEP.pattern(...) - .sideOutputLateData(lateOutputTag) - -// main output with matches -val result = patternStream.select(...) - -// side output containing the late events -val lateStream = patternStream.getSideOutput(lateOutputTag) -{% endhighlight %} -</div> -</div> +seen watermark. Late elements are not further processed. ## Examples http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala index d4bc28c..e71439c 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream} import org.apache.flink.cep.pattern.{Pattern => JPattern} import org.apache.flink.streaming.api.scala.{asScalaStream, _} -import org.apache.flink.util.{Collector, OutputTag} +import org.apache.flink.util.Collector import org.apache.flink.types.{Either => FEither} import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2} import java.lang.{Long => JLong} -import org.apache.flink.annotation.PublicEvolving import org.apache.flink.cep.operator.CEPOperatorUtils import org.apache.flink.cep.scala.pattern.Pattern @@ -47,23 +46,8 @@ import scala.collection.mutable */ class PatternStream[T](jPatternStream: JPatternStream[T]) { - private[flink] var lateDataOutputTag: OutputTag[T] = null - private[flink] def wrappedPatternStream = jPatternStream - - /** - * Send late arriving data to the side output identified by the given {@link OutputTag}. The - * CEP library assumes correctness of the watermark, so an element is considered late if its - * timestamp is smaller than the last received watermark. - */ - @PublicEvolving - def sideOutputLateData(outputTag: OutputTag[T]): PatternStream[T] = { - jPatternStream.sideOutputLateData(outputTag) - lateDataOutputTag = outputTag - this - } - def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]]) def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream()) @@ -110,8 +94,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { val patternStream = CEPOperatorUtils.createTimeoutPatternStream( jPatternStream.getInputStream(), - jPatternStream.getPattern(), - lateDataOutputTag) + jPatternStream.getPattern()) val cleanedSelect = cleanClosure(patternSelectFunction) val cleanedTimeout = cleanClosure(patternTimeoutFunction) @@ -176,8 +159,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { : DataStream[Either[L, R]] = { val patternStream = CEPOperatorUtils.createTimeoutPatternStream( jPatternStream.getInputStream(), - jPatternStream.getPattern(), - lateDataOutputTag + jPatternStream.getPattern() ) val cleanedSelect = cleanClosure(patternFlatSelectFunction) @@ -338,17 +320,6 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { flatSelect(patternFlatTimeoutFun, patternFlatSelectFun) } - - /** - * Gets the {@link DataStream} that contains the elements that are emitted from an operation - * into the side output with the given {@link OutputTag}. - * - * @param tag The tag identifying a specific side output. - */ - @PublicEvolving - def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = { - asScalaStream(jPatternStream.getSideOutput(tag)) - } } object PatternStream { http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index 04dff49..5544689 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Either; import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; -import org.apache.flink.util.Preconditions; import java.util.List; import java.util.Map; @@ -54,19 +52,6 @@ public class PatternStream<T> { private final Pattern<T, ?> pattern; - /** - * A reference to the created pattern stream used to get - * the registered side outputs, e.g late elements side output. - */ - private SingleOutputStreamOperator<?> patternStream; - - /** - * {@link OutputTag} to use for late arriving events. Elements for which - * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will - * be emitted to this. - */ - private OutputTag<T> lateDataOutputTag; - PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) { this.inputStream = inputStream; this.pattern = pattern; @@ -81,22 +66,6 @@ public class PatternStream<T> { } /** - * Send late arriving data to the side output identified by the given {@link OutputTag}. The - * CEP library assumes correctness of the watermark, so an element is considered late if its - * timestamp is smaller than the last received watermark. - */ - public PatternStream<T> sideOutputLateData(OutputTag<T> outputTag) { - Preconditions.checkNotNull(outputTag, "Side output tag must not be null."); - Preconditions.checkArgument(lateDataOutputTag == null, - "The late side output tag has already been initialized to " + lateDataOutputTag + "."); - Preconditions.checkArgument(patternStream == null, - "The late side output tag has to be set before calling select() or flatSelect()."); - - this.lateDataOutputTag = inputStream.getExecutionEnvironment().clean(outputTag); - return this; - } - - /** * Applies a select function to the detected pattern sequence. For each pattern sequence the * provided {@link PatternSelectFunction} is called. The pattern select function can produce * exactly one resulting element. @@ -137,8 +106,7 @@ public class PatternStream<T> { */ public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) { SingleOutputStreamOperator<Map<String, List<T>>> patternStream = - CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag); - this.patternStream = patternStream; + CEPOperatorUtils.createPatternStream(inputStream, pattern); return patternStream.map( new PatternSelectMapper<>( @@ -169,8 +137,7 @@ public class PatternStream<T> { final PatternSelectFunction<T, R> patternSelectFunction) { SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream = - CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag); - this.patternStream = patternStream; + CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern); TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternTimeoutFunction, @@ -240,8 +207,7 @@ public class PatternStream<T> { */ public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) { SingleOutputStreamOperator<Map<String, List<T>>> patternStream = - CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag); - this.patternStream = patternStream; + CEPOperatorUtils.createPatternStream(inputStream, pattern); return patternStream.flatMap( new PatternFlatSelectMapper<>( @@ -273,8 +239,7 @@ public class PatternStream<T> { final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream = - CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag); - this.patternStream = patternStream; + CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern); TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatTimeoutFunction, @@ -305,18 +270,6 @@ public class PatternStream<T> { } /** - * Gets the {@link DataStream} that contains the elements that are emitted from an operation - * into the side output with the given {@link OutputTag}. - * - * @param sideOutputTag The tag identifying a specific side output. - */ - public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) { - Preconditions.checkNotNull(patternStream, "The operator has not been initialized. " + - "To have the late element side output, you have to first define the main output using select() or flatSelect()."); - return patternStream.getSideOutput(sideOutputTag); - } - - /** * Wrapper for a {@link PatternSelectFunction}. * * @param <T> Type of the input elements http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 3afe397..7068bc4 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -47,7 +47,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Migration; -import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -99,13 +98,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> private transient InternalTimerService<VoidNamespace> timerService; /** - * {@link OutputTag} to use for late arriving events. Elements for which - * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will - * be emitted to this. - */ - private final OutputTag<IN> lateDataOutputTag; - - /** * The last seen watermark. This will be used to * decide if an incoming element is late or not. */ @@ -123,7 +115,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> final KeySelector<IN, KEY> keySelector, final TypeSerializer<KEY> keySerializer, final NFACompiler.NFAFactory<IN> nfaFactory, - final OutputTag<IN> lateDataOutputTag, final boolean migratingFromOldKeyedOperator) { this.inputSerializer = Preconditions.checkNotNull(inputSerializer); @@ -132,7 +123,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> this.keySerializer = Preconditions.checkNotNull(keySerializer); this.nfaFactory = Preconditions.checkNotNull(nfaFactory); - this.lateDataOutputTag = lateDataOutputTag; this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator; } @@ -203,8 +193,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> priorityQueue.offer(element); } updatePriorityQueue(priorityQueue); - } else { - sideOutputLateElement(element); } } } @@ -266,18 +254,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> this.lastWatermark = timestamp; } - /** - * Puts the provided late element in the dedicated side output, - * if the user has specified one. - * - * @param element The late element. - */ - private void sideOutputLateElement(StreamRecord<IN> element) { - if (lateDataOutputTag != null) { - output.collect(lateDataOutputTag, element); - } - } - private NFA<IN> getNFA() throws IOException { NFA<IN> nfa = nfaOperatorState.value(); return nfa != null ? nfa : nfaFactory.createNFA(); http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index 065c244..08424a4 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Either; -import org.apache.flink.util.OutputTag; import java.util.List; import java.util.Map; @@ -49,7 +48,7 @@ public class CEPOperatorUtils { * @return Data stream containing fully matched event sequences stored in a {@link Map}. The * events are indexed by their associated names of the pattern. */ - public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) { + public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) { final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); // check whether we use processing time @@ -76,7 +75,6 @@ public class CEPOperatorUtils { keySelector, keySerializer, nfaFactory, - lateDataOutputTag, true)); } else { @@ -92,7 +90,6 @@ public class CEPOperatorUtils { keySelector, keySerializer, nfaFactory, - lateDataOutputTag, false )).forceNonParallel(); } @@ -110,7 +107,7 @@ public class CEPOperatorUtils { * a {@link Either} instance. */ public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> createTimeoutPatternStream( - DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) { + DataStream<T> inputStream, Pattern<T, ?> pattern) { final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); @@ -142,7 +139,6 @@ public class CEPOperatorUtils { keySelector, keySerializer, nfaFactory, - lateDataOutputTag, true)); } else { @@ -158,7 +154,6 @@ public class CEPOperatorUtils { keySelector, keySerializer, nfaFactory, - lateDataOutputTag, false )).forceNonParallel(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java index f48f5c3..4d68afb 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java @@ -24,7 +24,6 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.OutputTag; import java.util.Collection; import java.util.Iterator; @@ -48,10 +47,9 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory, - OutputTag<IN> lateDataOutputTag, boolean migratingFromOldKeyedOperator) { - super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator); + super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java index 618a94d..9061bcb 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java @@ -25,7 +25,6 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Either; -import org.apache.flink.util.OutputTag; import java.util.Collection; import java.util.List; @@ -48,10 +47,9 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory, - OutputTag<IN> lateDataOutputTag, boolean migratingFromOldKeyedOperator) { - super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator); + super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index a6e925d..9a08659 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.types.Either; -import org.apache.flink.util.OutputTag; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -581,99 +580,4 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { env.execute(); } - - @Test - public void testLateEventSideOutput() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(1); - - // (Event, timestamp) - DataStream<Event> input = env.fromElements( - Tuple2.of(new Event(1, "start", 1.0), 1L), - Tuple2.of(new Event(2, "middle", 2.0), 2L), - Tuple2.of(new Event(3, "end", 3.0), 15L), - Tuple2.of(new Event(4, "middle", 5.0), 7L), - Tuple2.of(new Event(6, "start", 1.0), 21L), - Tuple2.of(new Event(5, "middle", 5.0), 10L), - Tuple2.of(new Event(7, "middle", 2.0), 22L), - Tuple2.of(new Event(8, "end", 3.0), 23L) - ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() { - - @Override - public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) { - return element.f1; - } - - @Override - public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) { - return lastElement.f0.getName().equals("end") ? new Watermark(extractedTimestamp) : null; - } - - }).map(new MapFunction<Tuple2<Event, Long>, Event>() { - - @Override - public Event map(Tuple2<Event, Long> value) throws Exception { - return value.f0; - } - }); - - Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("start"); - } - }).followedByAny("middle").where(new SimpleCondition<Event>() { - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("middle"); - } - }).followedByAny("end").where(new SimpleCondition<Event>() { - - @Override - public boolean filter(Event value) throws Exception { - return value.getName().equals("end"); - } - }); - - final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){}; - - PatternStream<Event> patternStream = CEP.pattern(input, pattern).sideOutputLateData(lateOutputTag); - DataStream<String> result = patternStream.select( - new PatternSelectFunction<Event, String>() { - - @Override - public String select(Map<String, List<Event>> pattern) { - StringBuilder builder = new StringBuilder(); - - builder.append(pattern.get("start").get(0).getId()).append(",") - .append(pattern.get("middle").get(0).getId()).append(",") - .append(pattern.get("end").get(0).getId()); - return builder.toString(); - } - } - ); - - DataStream<Event> lateEvents = patternStream.getSideOutput(lateOutputTag); - - // we just care for the late events in this test. - lateEvents.map( - new MapFunction<Event, Integer>() { - - @Override - public Integer map(Event value) throws Exception { - return value.getId(); - } - } - ).writeAsText(lateEventPath, FileSystem.WriteMode.OVERWRITE); - - // the expected sequence of late event ids - expectedLateEvents = "4\n5"; - - result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); - expected = "1,2,3\n1,2,8\n1,7,8\n6,7,8"; - env.execute(); - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java index afb3e7c..789d000 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java @@ -81,7 +81,6 @@ public class CEPFrom12MigrationTest { keySelector, IntSerializer.INSTANCE, new NFAFactory(), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -129,7 +128,6 @@ public class CEPFrom12MigrationTest { keySelector, IntSerializer.INSTANCE, new NFAFactory(), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -204,7 +202,6 @@ public class CEPFrom12MigrationTest { keySelector, IntSerializer.INSTANCE, new NFAFactory(), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -250,7 +247,6 @@ public class CEPFrom12MigrationTest { keySelector, IntSerializer.INSTANCE, new NFAFactory(), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -337,7 +333,6 @@ public class CEPFrom12MigrationTest { keySelector, IntSerializer.INSTANCE, new SinglePatternNFAFactory(), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -376,7 +371,6 @@ public class CEPFrom12MigrationTest { keySelector, IntSerializer.INSTANCE, new SinglePatternNFAFactory(), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 404de54..e5719c5 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -104,7 +104,6 @@ public class CEPMigration11to13Test { keySelector, IntSerializer.INSTANCE, new NFAFactory(), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -179,7 +178,6 @@ public class CEPMigration11to13Test { keySelector, ByteSerializer.INSTANCE, new NFAFactory(), - null, false), keySelector, BasicTypeInfo.BYTE_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 5ed8b46..74bddbb 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -227,7 +227,6 @@ public class CEPOperatorTest extends TestLogger { keySelector, IntSerializer.INSTANCE, new NFAFactory(true), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -487,7 +486,6 @@ public class CEPOperatorTest extends TestLogger { keySelector, IntSerializer.INSTANCE, new NFAFactory(), - null, true); } http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index 0210ef9..9eb8da2 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -347,7 +347,6 @@ public class CEPRescalingTest { keySelector, BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()), new NFAFactory(), - null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO,
