dawidwys closed pull request #7110: [FLINK-8159] [cep]
Pattern(Flat)SelectFunctions should support RichFunction interface
URL: https://github.com/apache/flink/pull/7110
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index 33d24de7748..91a15032a91 100644
---
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -113,7 +113,7 @@ class PatternTest {
assertTrue(pattern.getCondition.isDefined)
assertTrue(previous.getCondition.isDefined)
- assertFalse(preprevious.getCondition.isDefined)
+ assertTrue(preprevious.getCondition.isDefined)
assertEquals(pattern.getName, "end")
assertEquals(previous.getName, "next")
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
new file mode 100644
index 00000000000..ff6e610845c
--- /dev/null
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.metrics.MetricGroup;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class for pattern select function and iterative condition
function's {@link RuntimeContext}.
+ * The runtime context only supports basic operations. Consequently, state
access, accumulators,
+ * broadcast variables and the distributed cache are disabled.
+ */
+public class CepRuntimeContext implements RuntimeContext {
+
+ private final RuntimeContext runtimeContext;
+
+ public CepRuntimeContext(RuntimeContext runtimeContext) {
+ this.runtimeContext = runtimeContext;
+ }
+
+ @Override
+ public String getTaskName() {
+ return runtimeContext.getTaskName();
+ }
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return runtimeContext.getMetricGroup();
+ }
+
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return runtimeContext.getNumberOfParallelSubtasks();
+ }
+
+ @Override
+ public int getMaxNumberOfParallelSubtasks() {
+ return runtimeContext.getMaxNumberOfParallelSubtasks();
+ }
+
+ @Override
+ public int getIndexOfThisSubtask() {
+ return runtimeContext.getIndexOfThisSubtask();
+ }
+
+ @Override
+ public int getAttemptNumber() {
+ return runtimeContext.getAttemptNumber();
+ }
+
+ @Override
+ public String getTaskNameWithSubtasks() {
+ return runtimeContext.getTaskNameWithSubtasks();
+ }
+
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ return runtimeContext.getExecutionConfig();
+ }
+
+ @Override
+ public ClassLoader getUserCodeClassLoader() {
+ return runtimeContext.getUserCodeClassLoader();
+ }
+
+ //
-----------------------------------------------------------------------------------
+ // Unsupported operations
+ //
-----------------------------------------------------------------------------------
+
+ @Override
+ public <V, A extends Serializable> void addAccumulator(
+ String name, Accumulator<V, A> accumulator) {
+ throw new UnsupportedOperationException("Accumulators are not
supported.");
+ }
+
+ @Override
+ public <V, A extends Serializable> Accumulator<V, A>
getAccumulator(String name) {
+ throw new UnsupportedOperationException("Accumulators are not
supported.");
+ }
+
+ @Override
+ public Map<String, Accumulator<?, ?>> getAllAccumulators() {
+ throw new UnsupportedOperationException("Accumulators are not
supported.");
+ }
+
+ @Override
+ public IntCounter getIntCounter(String name) {
+ throw new UnsupportedOperationException("Int counters are not
supported.");
+ }
+
+ @Override
+ public LongCounter getLongCounter(String name) {
+ throw new UnsupportedOperationException("Long counters are not
supported.");
+ }
+
+ @Override
+ public DoubleCounter getDoubleCounter(String name) {
+ throw new UnsupportedOperationException("Double counters are
not supported.");
+ }
+
+ @Override
+ public Histogram getHistogram(String name) {
+ throw new UnsupportedOperationException("Histograms are not
supported.");
+ }
+
+ @Override
+ public boolean hasBroadcastVariable(String name) {
+ throw new UnsupportedOperationException("Broadcast variables
are not supported.");
+ }
+
+ @Override
+ public <RT> List<RT> getBroadcastVariable(String name) {
+ throw new UnsupportedOperationException("Broadcast variables
are not supported.");
+ }
+
+ @Override
+ public <T, C> C getBroadcastVariableWithInitializer(
+ String name, BroadcastVariableInitializer<T, C> initializer) {
+ throw new UnsupportedOperationException("Broadcast variables
are not supported.");
+ }
+
+ @Override
+ public DistributedCache getDistributedCache() {
+ throw new UnsupportedOperationException("Distributed cache is
not supported.");
+ }
+
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T>
stateProperties) {
+ throw new UnsupportedOperationException("State is not
supported.");
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T>
stateProperties) {
+ throw new UnsupportedOperationException("State is not
supported.");
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T>
stateProperties) {
+ throw new UnsupportedOperationException("State is not
supported.");
+ }
+
+ @Override
+ public <IN, ACC, OUT> AggregatingState<IN, OUT>
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+ throw new UnsupportedOperationException("State is not
supported.");
+ }
+
+ @Override
+ public <T, ACC> FoldingState<T, ACC>
getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ throw new UnsupportedOperationException("State is not
supported.");
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>
stateProperties) {
+ throw new UnsupportedOperationException("State is not
supported.");
+ }
+}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
new file mode 100644
index 00000000000..a2b89c37e97
--- /dev/null
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link
RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public abstract class RichPatternFlatSelectFunction<IN, OUT>
+ extends AbstractRichFunction
+ implements PatternFlatSelectFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void setRuntimeContext(RuntimeContext runtimeContext) {
+ Preconditions.checkNotNull(runtimeContext);
+
+ if (runtimeContext instanceof CepRuntimeContext) {
+ super.setRuntimeContext(runtimeContext);
+ } else {
+ super.setRuntimeContext(new
CepRuntimeContext(runtimeContext));
+ }
+ }
+
+ public abstract void flatSelect(Map<String, List<IN>> pattern,
Collector<OUT> out) throws Exception;
+}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
new file mode 100644
index 00000000000..ce694a38e05
--- /dev/null
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternSelectFunction}. As a {@link
RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public abstract class RichPatternSelectFunction<IN, OUT>
+ extends AbstractRichFunction
+ implements PatternSelectFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void setRuntimeContext(RuntimeContext runtimeContext) {
+ Preconditions.checkNotNull(runtimeContext);
+
+ if (runtimeContext instanceof CepRuntimeContext) {
+ super.setRuntimeContext(runtimeContext);
+ } else {
+ super.setRuntimeContext(new
CepRuntimeContext(runtimeContext));
+ }
+ }
+
+ public abstract OUT select(Map<String, List<IN>> pattern) throws
Exception;
+}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index ed2ff2e7e59..bc692f2bc0c 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -29,10 +29,10 @@
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.Quantifier.Times;
-import org.apache.flink.cep.pattern.conditions.AndCondition;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.cep.pattern.conditions.NotCondition;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.Serializable;
@@ -288,9 +288,9 @@ private void checkPatternNameUniqueness(final Pattern
pattern) {
if (lastSink.isFinal()) {
//so that the proceed to final
is not fired
- notNext.addIgnore(lastSink, new
NotCondition<>(notCondition));
+ notNext.addIgnore(lastSink, new
RichNotCondition<>(notCondition));
} else {
- notNext.addProceed(lastSink,
new NotCondition<>(notCondition));
+ notNext.addProceed(lastSink,
new RichNotCondition<>(notCondition));
}
notNext.addProceed(stopState,
notCondition);
lastSink = notNext;
@@ -612,11 +612,11 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
if (untilCondition != null) {
singletonState.addProceed(
originalStateMap.get(proceedState.getName()),
- new
AndCondition<>(proceedCondition, untilCondition));
+ new
RichAndCondition<>(proceedCondition, untilCondition));
}
singletonState.addProceed(proceedState,
untilCondition != null
- ? new
AndCondition<>(proceedCondition, new NotCondition<>(untilCondition))
+ ? new
RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition))
: proceedCondition);
} else {
singletonState.addProceed(proceedState,
proceedCondition);
@@ -734,12 +734,12 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
if
(currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY))
{
if (untilCondition != null) {
State<T> sinkStateCopy =
copy(sinkState);
- loopingState.addProceed(sinkStateCopy,
new AndCondition<>(proceedCondition, untilCondition));
+ loopingState.addProceed(sinkStateCopy,
new RichAndCondition<>(proceedCondition, untilCondition));
originalStateMap.put(sinkState.getName(), sinkStateCopy);
}
loopingState.addProceed(sinkState,
untilCondition != null
- ? new
AndCondition<>(proceedCondition, new NotCondition<>(untilCondition))
+ ? new
RichAndCondition<>(proceedCondition, new RichNotCondition<>(untilCondition))
: proceedCondition);
updateWithGreedyCondition(sinkState,
getTakeCondition(currentPattern));
} else {
@@ -774,9 +774,9 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
IterativeCondition<T> untilCondition,
boolean isTakeCondition) {
if (untilCondition != null && condition != null) {
- return new AndCondition<>(new
NotCondition<>(untilCondition), condition);
+ return new RichAndCondition<>(new
RichNotCondition<>(untilCondition), condition);
} else if (untilCondition != null && isTakeCondition) {
- return new NotCondition<>(untilCondition);
+ return new RichNotCondition<>(untilCondition);
}
return condition;
@@ -802,7 +802,7 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
innerIgnoreCondition = null;
break;
case SKIP_TILL_NEXT:
- innerIgnoreCondition = new
NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+ innerIgnoreCondition = new
RichNotCondition<>((IterativeCondition<T>) pattern.getCondition());
break;
case SKIP_TILL_ANY:
innerIgnoreCondition =
BooleanConditions.trueFunction();
@@ -843,7 +843,7 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
ignoreCondition = null;
break;
case SKIP_TILL_NEXT:
- ignoreCondition = new
NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+ ignoreCondition = new
RichNotCondition<>((IterativeCondition<T>) pattern.getCondition());
break;
case SKIP_TILL_ANY:
ignoreCondition =
BooleanConditions.trueFunction();
@@ -896,7 +896,7 @@ private void updateWithGreedyCondition(
IterativeCondition<T> takeCondition) {
for (StateTransition<T> stateTransition :
state.getStateTransitions()) {
stateTransition.setCondition(
- new
AndCondition<>(stateTransition.getCondition(), new
NotCondition<>(takeCondition)));
+ new
RichAndCondition<>(stateTransition.getCondition(), new
RichNotCondition<>(takeCondition)));
}
}
}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b57c3fe0b2f..c603741cea5 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
@@ -33,10 +34,14 @@
import org.apache.flink.cep.nfa.NFA.MigratedNFA;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
+import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
@@ -185,6 +190,15 @@ public void open() throws Exception {
this);
this.nfa = nfaFactory.createNFA();
+
+ openNFA(nfa);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ closeNFA(nfa);
}
@Override
@@ -395,6 +409,26 @@ private void advanceTime(NFAState nfaState, long
timestamp) throws Exception {
}
}
+ private void openNFA(NFA<IN> nfa) throws Exception {
+ Configuration conf = new Configuration();
+ for (State<IN> state : nfa.getStates()) {
+ for (StateTransition<IN> transition :
state.getStateTransitions()) {
+ IterativeCondition condition =
transition.getCondition();
+
FunctionUtils.setFunctionRuntimeContext(condition, getRuntimeContext());
+ FunctionUtils.openFunction(condition, conf);
+ }
+ }
+ }
+
+ private void closeNFA(NFA<IN> nfa) throws Exception {
+ for (State<IN> state : nfa.getStates()) {
+ for (StateTransition<IN> transition :
state.getStateTransitions()) {
+ IterativeCondition condition =
transition.getCondition();
+ FunctionUtils.closeFunction(condition);
+ }
+ }
+ }
+
protected abstract void processMatchedSequences(Iterable<Map<String,
List<IN>>> matchingSequences, long timestamp) throws Exception;
protected void processTimedOutSequences(
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 6ad9d9a1669..64236765caf 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -23,9 +23,10 @@
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
import org.apache.flink.cep.pattern.Quantifier.Times;
-import org.apache.flink.cep.pattern.conditions.AndCondition;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.cep.pattern.conditions.OrCondition;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Preconditions;
@@ -105,7 +106,11 @@ public Quantifier getQuantifier() {
}
public IterativeCondition<F> getCondition() {
- return condition;
+ if (condition != null) {
+ return condition;
+ } else {
+ return BooleanConditions.trueFunction();
+ }
}
public IterativeCondition<F> getUntilCondition() {
@@ -154,7 +159,7 @@ public Quantifier getQuantifier() {
if (this.condition == null) {
this.condition = condition;
} else {
- this.condition = new AndCondition<>(this.condition,
condition);
+ this.condition = new RichAndCondition<>(this.condition,
condition);
}
return this;
}
@@ -177,7 +182,7 @@ public Quantifier getQuantifier() {
if (this.condition == null) {
this.condition = condition;
} else {
- this.condition = new OrCondition<>(this.condition,
condition);
+ this.condition = new RichOrCondition<>(this.condition,
condition);
}
return this;
}
@@ -196,7 +201,7 @@ public Quantifier getQuantifier() {
if (condition == null) {
this.condition = new SubtypeCondition<F>(subtypeClass);
} else {
- this.condition = new AndCondition<>(condition, new
SubtypeCondition<F>(subtypeClass));
+ this.condition = new RichAndCondition<>(condition, new
SubtypeCondition<F>(subtypeClass));
}
@SuppressWarnings("unchecked")
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
index ac34c41301b..4622417bcf1 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
@@ -18,6 +18,7 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
/**
@@ -25,7 +26,11 @@
* {@code AND} and returns {@code true} if both are {@code true}.
*
* @param <T> Type of the element to filter
+ * @deprecated Please use {@link RichAndCondition} instead. This class exists
just for
+ * backwards compatibility and will be removed in FLINK-10113.
*/
+@Internal
+@Deprecated
public class AndCondition<T> extends IterativeCondition<T> {
private static final long serialVersionUID = -2471892317390197319L;
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
index aea5a3bdd43..17c443fa0d9 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
@@ -18,10 +18,13 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.annotation.Internal;
+
/**
* Utility class containing an {@link IterativeCondition} that always returns
* {@code true} and one that always returns {@code false}.
*/
+@Internal
public class BooleanConditions {
/**
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
index 9318c2f6772..72dc4bb8697 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
@@ -18,12 +18,18 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.annotation.Internal;
+
/**
* A {@link IterativeCondition condition} which negates the condition it wraps
* and returns {@code true} if the original condition returns {@code false}.
*
* @param <T> Type of the element to filter
+ * @deprecated Please use {@link RichNotCondition} instead. This class exists
just for
+ * backwards compatibility and will be removed in FLINK-10113.
*/
+@Internal
+@Deprecated
public class NotCondition<T> extends IterativeCondition<T> {
private static final long serialVersionUID = -2109562093871155005L;
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
index d3690ab4da0..ac8c465c59f 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
@@ -18,6 +18,7 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
/**
@@ -25,7 +26,11 @@
* {@code OR} and returns {@code true} if at least one is {@code true}.
*
* @param <T> Type of the element to filter
+ * @deprecated Please use {@link RichOrCondition} instead. This class exists
just for
+ * backwards compatibility and will be removed in FLINK-10113.
*/
+@Internal
+@Deprecated
public class OrCondition<T> extends IterativeCondition<T> {
private static final long serialVersionUID = 2554610954278485106L;
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
new file mode 100644
index 00000000000..1d5d1e7367d
--- /dev/null
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichAndCondition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions
with a logical
+ * {@code AND} and returns {@code true} if both are {@code true}.
+ *
+ * @param <T> Type of the element to filter
+ */
+@Internal
+public class RichAndCondition<T> extends RichCompositeIterativeCondition<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public RichAndCondition(final IterativeCondition<T> left, final
IterativeCondition<T> right) {
+ super(left, right);
+ }
+
+ @Override
+ public boolean filter(T value, Context<T> ctx) throws Exception {
+ return getLeft().filter(value, ctx) && getRight().filter(value,
ctx);
+ }
+
+ /**
+ * @return One of the {@link IterativeCondition conditions} combined in
this condition.
+ */
+ public IterativeCondition<T> getLeft() {
+ return getNestedConditions()[0];
+ }
+
+ /**
+ * @return One of the {@link IterativeCondition conditions} combined in
this condition.
+ */
+ public IterativeCondition<T> getRight() {
+ return getNestedConditions()[1];
+ }
+}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java
new file mode 100644
index 00000000000..5f7789adba6
--- /dev/null
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichCompositeIterativeCondition.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A base class of composite {@link IterativeCondition} conditions such as
{@link RichAndCondition},
+ * {@link RichOrCondition} and {@link RichNotCondition}, etc. It handles the
open, close and
+ * setRuntimeContext for the nested {@link IterativeCondition} conditions.
+ *
+ * @param <T> Type of the element to filter
+ */
+public abstract class RichCompositeIterativeCondition<T> extends
RichIterativeCondition<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IterativeCondition<T>[] nestedConditions;
+
+ @SafeVarargs
+ public RichCompositeIterativeCondition(final IterativeCondition<T>...
nestedConditions) {
+ for (IterativeCondition<T> condition : nestedConditions) {
+ Preconditions.checkNotNull(condition, "The condition
cannot be null.");
+ }
+ this.nestedConditions = nestedConditions;
+ }
+
+ public IterativeCondition<T>[] getNestedConditions() {
+ return nestedConditions;
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ super.setRuntimeContext(t);
+
+ for (IterativeCondition<T> nestedCondition : nestedConditions) {
+
FunctionUtils.setFunctionRuntimeContext(nestedCondition, t);
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ for (IterativeCondition<T> nestedCondition : nestedConditions) {
+ FunctionUtils.openFunction(nestedCondition, parameters);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ for (IterativeCondition<T> nestedCondition : nestedConditions) {
+ FunctionUtils.closeFunction(nestedCondition);
+ }
+ }
+}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
new file mode 100644
index 00000000000..12f017e9946
--- /dev/null
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.cep.CepRuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Rich variant of the {@link IterativeCondition}. As a {@link RichFunction},
it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ */
+public abstract class RichIterativeCondition<T>
+ extends IterativeCondition<T>
+ implements RichFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ //
--------------------------------------------------------------------------------------------
+ // Runtime context access
+ //
--------------------------------------------------------------------------------------------
+
+ private transient RuntimeContext runtimeContext;
+
+ @Override
+ public void setRuntimeContext(RuntimeContext runtimeContext) {
+ Preconditions.checkNotNull(runtimeContext);
+
+ if (runtimeContext instanceof CepRuntimeContext) {
+ this.runtimeContext = runtimeContext;
+ } else {
+ this.runtimeContext = new
CepRuntimeContext(runtimeContext);
+ }
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ if (this.runtimeContext != null) {
+ return this.runtimeContext;
+ } else {
+ throw new IllegalStateException("The runtime context
has not been initialized.");
+ }
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new UnsupportedOperationException("Not support to get the
IterationRuntimeContext in IterativeCondition.");
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Default life cycle methods
+ //
--------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+
+}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
new file mode 100644
index 00000000000..a4929eb5e15
--- /dev/null
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichNotCondition.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A {@link RichIterativeCondition condition} which negates the condition it
wraps
+ * and returns {@code true} if the original condition returns {@code false}.
+ *
+ * @param <T> Type of the element to filter
+ */
+@Internal
+public class RichNotCondition<T> extends RichCompositeIterativeCondition<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public RichNotCondition(final IterativeCondition<T> original) {
+ super(original);
+ }
+
+ @Override
+ public boolean filter(T value, Context<T> ctx) throws Exception {
+ return !getNestedConditions()[0].filter(value, ctx);
+ }
+}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java
new file mode 100644
index 00000000000..03bd1e73092
--- /dev/null
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichOrCondition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * A {@link RichIterativeCondition condition} which combines two conditions
with a logical
+ * {@code OR} and returns {@code true} if at least one is {@code true}.
+ *
+ * @param <T> Type of the element to filter
+ */
+@Internal
+public class RichOrCondition<T> extends RichCompositeIterativeCondition<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public RichOrCondition(final IterativeCondition<T> left, final
IterativeCondition<T> right) {
+ super(left, right);
+ }
+
+ @Override
+ public boolean filter(T value, Context<T> ctx) throws Exception {
+ return getLeft().filter(value, ctx) || getRight().filter(value,
ctx);
+ }
+
+ /**
+ * @return One of the {@link IterativeCondition conditions} combined in
this condition.
+ */
+ public IterativeCondition<T> getLeft() {
+ return getNestedConditions()[0];
+ }
+
+ /**
+ * @return One of the {@link IterativeCondition conditions} combined in
this condition.
+ */
+ public IterativeCondition<T> getRight() {
+ return getNestedConditions()[1];
+ }
+}
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
index 9ca52c5c8d8..de46d1f756d 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SimpleCondition.java
@@ -18,6 +18,7 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.FilterFunction;
/**
@@ -28,6 +29,7 @@
* previously accepted elements in the pattern. Conditions that extend this
class are simple {@code filter(...)}
* functions that decide based on the properties of the element at hand.
*/
+@Internal
public abstract class SimpleCondition<T> extends IterativeCondition<T>
implements FilterFunction<T> {
private static final long serialVersionUID = 4942618239408140245L;
diff --git
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
index cff8693c588..249757d8e59 100644
---
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
+++
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
@@ -18,6 +18,7 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
/**
@@ -26,6 +27,7 @@
*
* @param <T> Type of the elements to be filtered
*/
+@Internal
public class SubtypeCondition<T> extends SimpleCondition<T> {
private static final long serialVersionUID = -2990017519957561355L;
diff --git
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index e397d318241..33c1fdd571c 100644
---
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -19,13 +19,17 @@
package org.apache.flink.cep;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
@@ -35,6 +39,7 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Either;
+import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -716,4 +721,167 @@ public boolean filter(Tuple2<Integer, String> rec) throws
Exception {
assertEquals(expected, resultList);
}
+
+ @Test
+ public void testRichPatternFlatSelectFunction() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "barfoo", 1.0),
+ new Event(2, "start", 2.0),
+ new Event(3, "foobar", 3.0),
+ new SubEvent(4, "foo", 4.0, 1.0),
+ new Event(5, "middle", 5.0),
+ new SubEvent(6, "middle", 6.0, 2.0),
+ new SubEvent(7, "bar", 3.0, 3.0),
+ new Event(42, "42", 42.0),
+ new Event(8, "end", 1.0)
+ );
+
+ Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new RichIterativeCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx)
throws Exception {
+ return value.getName().equals("start");
+ }
+ }).followedByAny("middle").subtype(SubEvent.class).where(
+ new SimpleCondition<SubEvent>() {
+
+ @Override
+ public boolean filter(SubEvent value) throws
Exception {
+ return value.getName().equals("middle");
+ }
+ }
+ ).followedByAny("end").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ });
+
+ DataStream<String> result =
+ CEP.pattern(input, pattern).flatSelect(new
RichPatternFlatSelectFunction<Event, String>() {
+
+ @Override
+ public void open(Configuration config) {
+ try {
+
getRuntimeContext().getMapState(new MapStateDescriptor<>(
+ "test",
+ LongSerializer.INSTANCE,
+
LongSerializer.INSTANCE));
+ throw new
RuntimeException("Expected getMapState to fail with unsupported operation
exception.");
+ } catch (UnsupportedOperationException
e) {
+ // ignore, expected
+ }
+
+
getRuntimeContext().getUserCodeClassLoader();
+ }
+
+ @Override
+ public void flatSelect(Map<String, List<Event>>
p, Collector<String> o) throws Exception {
+ StringBuilder builder = new
StringBuilder();
+
+
builder.append(p.get("start").get(0).getId()).append(",")
+
.append(p.get("middle").get(0).getId()).append(",")
+
.append(p.get("end").get(0).getId());
+
+ o.collect(builder.toString());
+ }
+ }, Types.STRING);
+
+ List<String> resultList = new ArrayList<>();
+
+
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+ assertEquals(Arrays.asList("2,6,8"), resultList);
+ }
+
+ @Test
+ public void testRichPatternSelectFunction() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ DataStream<Event> input = env.fromElements(
+ new Event(1, "barfoo", 1.0),
+ new Event(2, "start", 2.0),
+ new Event(3, "start", 2.1),
+ new Event(3, "foobar", 3.0),
+ new SubEvent(4, "foo", 4.0, 1.0),
+ new SubEvent(3, "middle", 3.2, 1.0),
+ new Event(42, "start", 3.1),
+ new SubEvent(42, "middle", 3.3, 1.2),
+ new Event(5, "middle", 5.0),
+ new SubEvent(2, "middle", 6.0, 2.0),
+ new SubEvent(7, "bar", 3.0, 3.0),
+ new Event(42, "42", 42.0),
+ new Event(3, "end", 2.0),
+ new Event(2, "end", 1.0),
+ new Event(42, "end", 42.0)
+ ).keyBy(new KeySelector<Event, Integer>() {
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ });
+
+ Pattern<Event, ?> pattern =
Pattern.<Event>begin("start").where(new RichIterativeCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx)
throws Exception {
+ return value.getName().equals("start");
+ }
+ }).followedByAny("middle").subtype(SubEvent.class).where(
+ new SimpleCondition<SubEvent>() {
+
+ @Override
+ public boolean filter(SubEvent value) throws
Exception {
+ return value.getName().equals("middle");
+ }
+ }
+ ).followedByAny("end").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws
Exception {
+ return value.getName().equals("end");
+ }
+ });
+
+ DataStream<String> result = CEP.pattern(input,
pattern).select(new RichPatternSelectFunction<Event, String>() {
+ @Override
+ public void open(Configuration config) {
+ try {
+ getRuntimeContext().getMapState(new
MapStateDescriptor<>(
+ "test",
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE));
+ throw new RuntimeException("Expected
getMapState to fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // ignore, expected
+ }
+
+ getRuntimeContext().getUserCodeClassLoader();
+ }
+
+ @Override
+ public String select(Map<String, List<Event>> p) throws
Exception {
+ StringBuilder builder = new StringBuilder();
+
+
builder.append(p.get("start").get(0).getId()).append(",")
+
.append(p.get("middle").get(0).getId()).append(",")
+ .append(p.get("end").get(0).getId());
+
+ return builder.toString();
+ }
+ });
+
+ List<String> resultList = new ArrayList<>();
+
+
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
+
+ resultList.sort(String::compareTo);
+
+ assertEquals(Arrays.asList("2,2,2", "3,3,3", "42,42,42"),
resultList);
+ }
}
diff --git
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
new file mode 100644
index 00000000000..6bc4081da54
--- /dev/null
+++
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichNotCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link CepRuntimeContext}.
+ */
+public class CepRuntimeContextTest {
+
+ @Test
+ public void testRichCompositeIterativeCondition() throws Exception {
+ RichIterativeCondition<Integer> first = new
TestRichIterativeCondition();
+ RichIterativeCondition<Integer> second = new
TestRichIterativeCondition();
+ RichIterativeCondition<Integer> third = new
TestRichIterativeCondition();
+
+ RichCompositeIterativeCondition function = new
RichCompositeIterativeCondition(first, second, third) {
+ @Override
+ public boolean filter(Object value, Context ctx) throws
Exception {
+ return false;
+ }
+ };
+ function.setRuntimeContext(mock(RuntimeContext.class));
+
+ assertTrue(first.getRuntimeContext() instanceof
CepRuntimeContext);
+ assertTrue(second.getRuntimeContext() instanceof
CepRuntimeContext);
+ assertTrue(third.getRuntimeContext() instanceof
CepRuntimeContext);
+ }
+
+ @Test
+ public void testRichAndCondition() throws Exception {
+ RichIterativeCondition<Integer> left = new
TestRichIterativeCondition();
+ RichIterativeCondition<Integer> right = new
TestRichIterativeCondition();
+
+ RichAndCondition function = new RichAndCondition<>(left, right);
+ function.setRuntimeContext(mock(RuntimeContext.class));
+
+ assertTrue(left.getRuntimeContext() instanceof
CepRuntimeContext);
+ assertTrue(right.getRuntimeContext() instanceof
CepRuntimeContext);
+ }
+
+ @Test
+ public void testRichOrCondition() throws Exception {
+ RichIterativeCondition<Integer> left = new
TestRichIterativeCondition();
+ RichIterativeCondition<Integer> right = new
TestRichIterativeCondition();
+
+ RichOrCondition function = new RichOrCondition<>(left, right);
+ function.setRuntimeContext(mock(RuntimeContext.class));
+
+ assertTrue(left.getRuntimeContext() instanceof
CepRuntimeContext);
+ assertTrue(right.getRuntimeContext() instanceof
CepRuntimeContext);
+ }
+
+ @Test
+ public void testRichNotCondition() {
+ RichIterativeCondition<Integer> original = new
TestRichIterativeCondition();
+
+ RichNotCondition function = new RichNotCondition<>(original);
+ function.setRuntimeContext(mock(RuntimeContext.class));
+
+ assertTrue(original.getRuntimeContext() instanceof
CepRuntimeContext);
+ }
+
+ @Test
+ public void testRichPatternSelectFunction() {
+ verifyRuntimeContext(new TestRichPatternSelectFunction());
+ }
+
+ @Test
+ public void testRichPatternFlatSelectFunction() {
+ verifyRuntimeContext(new TestRichPatternFlatSelectFunction());
+ }
+
+ @Test
+ public void testRichIterativeCondition() {
+ verifyRuntimeContext(new TestRichIterativeCondition());
+ }
+
+ private void verifyRuntimeContext(final RichFunction function) {
+ final String taskName = "foobarTask";
+ final MetricGroup metricGroup = new UnregisteredMetricsGroup();
+ final int numberOfParallelSubtasks = 42;
+ final int indexOfSubtask = 43;
+ final int attemptNumber = 1337;
+ final String taskNameWithSubtask = "barfoo";
+ final ExecutionConfig executionConfig =
mock(ExecutionConfig.class);
+ final ClassLoader userCodeClassLoader = mock(ClassLoader.class);
+
+ RuntimeContext mockedRuntimeContext =
mock(RuntimeContext.class);
+
+ when(mockedRuntimeContext.getTaskName()).thenReturn(taskName);
+
when(mockedRuntimeContext.getMetricGroup()).thenReturn(metricGroup);
+
when(mockedRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(numberOfParallelSubtasks);
+
when(mockedRuntimeContext.getIndexOfThisSubtask()).thenReturn(indexOfSubtask);
+
when(mockedRuntimeContext.getAttemptNumber()).thenReturn(attemptNumber);
+
when(mockedRuntimeContext.getTaskNameWithSubtasks()).thenReturn(taskNameWithSubtask);
+
when(mockedRuntimeContext.getExecutionConfig()).thenReturn(executionConfig);
+
when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
+
+ function.setRuntimeContext(mockedRuntimeContext);
+
+ RuntimeContext runtimeContext = function.getRuntimeContext();
+
+ assertTrue(runtimeContext instanceof CepRuntimeContext);
+ assertEquals(taskName, runtimeContext.getTaskName());
+ assertEquals(metricGroup, runtimeContext.getMetricGroup());
+ assertEquals(numberOfParallelSubtasks,
runtimeContext.getNumberOfParallelSubtasks());
+ assertEquals(indexOfSubtask,
runtimeContext.getIndexOfThisSubtask());
+ assertEquals(attemptNumber, runtimeContext.getAttemptNumber());
+ assertEquals(taskNameWithSubtask,
runtimeContext.getTaskNameWithSubtasks());
+ assertEquals(executionConfig,
runtimeContext.getExecutionConfig());
+ assertEquals(userCodeClassLoader,
runtimeContext.getUserCodeClassLoader());
+
+ try {
+ runtimeContext.getDistributedCache();
+ fail("Expected getDistributedCached to fail with
unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getState(new
ValueStateDescriptor<>("foobar", Integer.class, 42));
+ fail("Expected getState to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getListState(new
ListStateDescriptor<>("foobar", Integer.class));
+ fail("Expected getListState to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getReducingState(new
ReducingStateDescriptor<>(
+ "foobar",
+ mock(ReduceFunction.class),
+ Integer.class));
+ fail("Expected getReducingState to fail with
unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getAggregatingState(new
AggregatingStateDescriptor<>(
+ "foobar",
+ mock(AggregateFunction.class),
+ Integer.class));
+ fail("Expected getAggregatingState to fail with
unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getFoldingState(new
FoldingStateDescriptor<>(
+ "foobar",
+ 0,
+ mock(FoldFunction.class),
+ Integer.class));
+ fail("Expected getFoldingState to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getMapState(new
MapStateDescriptor<>("foobar", Integer.class, String.class));
+ fail("Expected getMapState to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.addAccumulator("foobar",
mock(Accumulator.class));
+ fail("Expected addAccumulator to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getAccumulator("foobar");
+ fail("Expected getAccumulator to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getAllAccumulators();
+ fail("Expected getAllAccumulators to fail with
unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getIntCounter("foobar");
+ fail("Expected getIntCounter to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getLongCounter("foobar");
+ fail("Expected getLongCounter to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getDoubleCounter("foobar");
+ fail("Expected getDoubleCounter to fail with
unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getHistogram("foobar");
+ fail("Expected getHistogram to fail with unsupported
operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.hasBroadcastVariable("foobar");
+ fail("Expected hasBroadcastVariable to fail with
unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getBroadcastVariable("foobar");
+ fail("Expected getBroadcastVariable to fail with
unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ try {
+ runtimeContext.getBroadcastVariableWithInitializer(
+ "foobar",
+ mock(BroadcastVariableInitializer.class));
+ fail("Expected getBroadcastVariableWithInitializer to
fail with unsupported operation exception.");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+ }
+
+ private static class TestRichIterativeCondition extends
RichIterativeCondition<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Integer value, Context<Integer> ctx)
throws Exception {
+ return false;
+ }
+ }
+
+ private static class TestRichPatternSelectFunction extends
RichPatternSelectFunction<Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer select(Map<String, List<Integer>> pattern)
throws Exception {
+ return null;
+ }
+ }
+
+ private static class TestRichPatternFlatSelectFunction extends
RichPatternFlatSelectFunction<Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatSelect(Map<String, List<Integer>> pattern,
Collector<Integer> out) throws Exception {
+ // no op
+ }
+ }
+}
diff --git
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 6d93ff3a3b4..743425226f2 100644
---
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -21,7 +21,9 @@
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
-import org.apache.flink.cep.pattern.conditions.OrCondition;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.RichAndCondition;
+import org.apache.flink.cep.pattern.conditions.RichOrCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
import org.apache.flink.util.TestLogger;
@@ -33,6 +35,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
/**
* Tests for constructing {@link Pattern}.
@@ -102,7 +105,7 @@ public boolean filter(Event value) throws Exception {
assertNotNull(pattern.getCondition());
assertNotNull(previous.getCondition());
- assertNull(previous2.getCondition());
+ assertNotNull(previous2.getCondition());
assertEquals(pattern.getName(), "end");
assertEquals(previous.getName(), "next");
@@ -187,14 +190,27 @@ public boolean filter(Event value) throws Exception {
assertNull(previous2.getPrevious());
assertEquals(ConsumingStrategy.SKIP_TILL_NEXT,
pattern.getQuantifier().getConsumingStrategy());
- assertFalse(previous.getCondition() instanceof OrCondition);
- assertTrue(previous2.getCondition() instanceof OrCondition);
+ assertFalse(previous.getCondition() instanceof RichOrCondition);
+ assertTrue(previous2.getCondition() instanceof RichOrCondition);
assertEquals(pattern.getName(), "end");
assertEquals(previous.getName(), "or");
assertEquals(previous2.getName(), "start");
}
+ @Test
+ public void testRichCondition() {
+ Pattern<Event, Event> pattern =
+ Pattern.<Event>begin("start")
+ .where(mock(IterativeCondition.class))
+ .where(mock(IterativeCondition.class))
+ .followedBy("end")
+ .where(mock(IterativeCondition.class))
+ .or(mock(IterativeCondition.class));
+ assertTrue(pattern.getCondition() instanceof RichOrCondition);
+ assertTrue(pattern.getPrevious().getCondition() instanceof
RichAndCondition);
+ }
+
@Test(expected = IllegalArgumentException.class)
public void testPatternTimesNegativeTimes() throws Exception {
Pattern.begin("start").where(dummyCondition()).times(-1);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services