dianfu closed pull request #4513: [FLINK-6938] [cep] IterativeCondition should 
support RichFunction interface
URL: https://github.com/apache/flink/pull/4513
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 2415a2e3a23..414579a179d 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -31,7 +32,11 @@
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -49,6 +54,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
@@ -95,6 +101,8 @@
 
        protected final AfterMatchSkipStrategy afterMatchSkipStrategy;
 
+       private final Configuration conf;
+
        public AbstractKeyedCEPPatternOperator(
                        final TypeSerializer<IN> inputSerializer,
                        final boolean isProcessingTime,
@@ -114,6 +122,8 @@ public AbstractKeyedCEPPatternOperator(
                } else {
                        this.afterMatchSkipStrategy = afterMatchSkipStrategy;
                }
+
+               this.conf = new Configuration();
        }
 
        @Override
@@ -295,12 +305,16 @@ private void updateLastSeenWatermark(long timestamp) {
                this.lastWatermark = timestamp;
        }
 
-       private NFA<IN> getNFA() throws IOException {
+       private NFA<IN> getNFA() throws Exception {
                NFA<IN> nfa = nfaOperatorState.value();
-               return nfa != null ? nfa : nfaFactory.createNFA();
+               if (nfa == null) {
+                       nfa = nfaFactory.createNFA();
+               }
+               initStates(nfa.getStates());
+               return nfa;
        }
 
-       private void updateNFA(NFA<IN> nfa) throws IOException {
+       private void updateNFA(NFA<IN> nfa) throws Exception {
                if (nfa.isNFAChanged()) {
                        if (nfa.isEmpty()) {
                                nfaOperatorState.clear();
@@ -309,6 +323,26 @@ private void updateNFA(NFA<IN> nfa) throws IOException {
                                nfaOperatorState.update(nfa);
                        }
                }
+               destroyStates(nfa.getStates());
+       }
+
+       private void initStates(Set<State<IN>> states) throws Exception {
+               for (State<IN> state : states) {
+                       for (StateTransition<IN> transition : 
state.getStateTransitions()) {
+                               IterativeCondition condition = 
transition.getCondition();
+                               
FunctionUtils.setFunctionRuntimeContext(condition, getRuntimeContext());
+                               FunctionUtils.openFunction(condition, conf);
+                       }
+               }
+       }
+
+       private void destroyStates(Set<State<IN>> states) throws Exception {
+               for (State<IN> state : states) {
+                       for (StateTransition<IN> transition : 
state.getStateTransitions()) {
+                               IterativeCondition condition = 
transition.getCondition();
+                               FunctionUtils.closeFunction(condition);
+                       }
+               }
        }
 
        private PriorityQueue<Long> getSortedTimestamps() throws Exception {
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
index ac34c41301b..6711cee4ad8 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -26,7 +29,7 @@
  *
  * @param <T> Type of the element to filter
  */
-public class AndCondition<T> extends IterativeCondition<T> {
+public class AndCondition<T> extends RichIterativeCondition<T> {
 
        private static final long serialVersionUID = -2471892317390197319L;
 
@@ -38,6 +41,27 @@ public AndCondition(final IterativeCondition<T> left, final 
IterativeCondition<T
                this.right = Preconditions.checkNotNull(right, "The condition 
cannot be null.");
        }
 
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               super.setRuntimeContext(t);
+               FunctionUtils.setFunctionRuntimeContext(left, t);
+               FunctionUtils.setFunctionRuntimeContext(right, t);
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               FunctionUtils.openFunction(left, parameters);
+               FunctionUtils.openFunction(right, parameters);
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               FunctionUtils.closeFunction(left);
+               FunctionUtils.closeFunction(right);
+       }
+
        @Override
        public boolean filter(T value, Context<T> ctx) throws Exception {
                return left.filter(value, ctx) && right.filter(value, ctx);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
index 9318c2f6772..17255717c41 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+
 /**
  * A {@link IterativeCondition condition} which negates the condition it wraps
  * and returns {@code true} if the original condition returns {@code false}.
  *
  * @param <T> Type of the element to filter
  */
-public class NotCondition<T> extends IterativeCondition<T> {
+public class NotCondition<T> extends RichIterativeCondition<T> {
        private static final long serialVersionUID = -2109562093871155005L;
 
        private final IterativeCondition<T> original;
@@ -33,6 +37,30 @@ public NotCondition(final IterativeCondition<T> original) {
                this.original = original;
        }
 
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               super.setRuntimeContext(t);
+               if (original != null) {
+                       FunctionUtils.setFunctionRuntimeContext(original, t);
+               }
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               if (original != null) {
+                       FunctionUtils.openFunction(original, parameters);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               if (original != null) {
+                       FunctionUtils.closeFunction(original);
+               }
+       }
+
        @Override
        public boolean filter(T value, Context<T> ctx) throws Exception {
                return original != null && !original.filter(value, ctx);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
index d3690ab4da0..48f332b3785 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -26,7 +29,7 @@
  *
  * @param <T> Type of the element to filter
  */
-public class OrCondition<T> extends IterativeCondition<T> {
+public class OrCondition<T> extends RichIterativeCondition<T> {
 
        private static final long serialVersionUID = 2554610954278485106L;
 
@@ -38,6 +41,27 @@ public OrCondition(final IterativeCondition<T> left, final 
IterativeCondition<T>
                this.right = Preconditions.checkNotNull(right, "The condition 
cannot be null.");
        }
 
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               super.setRuntimeContext(t);
+               FunctionUtils.setFunctionRuntimeContext(left, t);
+               FunctionUtils.setFunctionRuntimeContext(right, t);
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               FunctionUtils.openFunction(left, parameters);
+               FunctionUtils.openFunction(right, parameters);
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+               FunctionUtils.closeFunction(left);
+               FunctionUtils.closeFunction(right);
+       }
+
        @Override
        public boolean filter(T value, Context<T> ctx) throws Exception {
                return left.filter(value, ctx) || right.filter(value, ctx);
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
new file mode 100644
index 00000000000..e2ab52458f2
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern.conditions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Rich variant of the {@link IterativeCondition}. As a {@link RichFunction}, 
it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides 
setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ */
+public abstract class RichIterativeCondition<T>
+       extends IterativeCondition<T>
+       implements RichFunction {
+
+       private static final long serialVersionUID = -8877558011192469582L;
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Runtime context access
+       // 
--------------------------------------------------------------------------------------------
+
+       private transient RuntimeContext runtimeContext;
+
+       @Override
+       public void setRuntimeContext(RuntimeContext t) {
+               this.runtimeContext = t;
+       }
+
+       @Override
+       public RuntimeContext getRuntimeContext() {
+               if (this.runtimeContext != null) {
+                       return this.runtimeContext;
+               } else {
+                       throw new IllegalStateException("The runtime context 
has not been initialized.");
+               }
+       }
+
+       @Override
+       public IterationRuntimeContext getIterationRuntimeContext() {
+               throw new UnsupportedOperationException("Not support to get the 
IterationRuntimeContext in IterativeCondition.");
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Default life cycle methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public void open(Configuration parameters) throws Exception {}
+
+       @Override
+       public void close() throws Exception {}
+
+}
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 81b83a395ad..9e39efff877 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -24,7 +24,9 @@
 import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -44,6 +46,8 @@
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.fail;
+
 /**
  * End to end tests of both CEP operators and {@link NFA}.
  */
@@ -95,13 +99,7 @@ public void testSimplePatternCEP() throws Exception {
                        new Event(8, "end", 1.0)
                );
 
-               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("start");
-                       }
-               })
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new NameCondition("start"))
                .followedByAny("middle").subtype(SubEvent.class).where(
                                new SimpleCondition<SubEvent>() {
 
@@ -111,13 +109,7 @@ public boolean filter(SubEvent value) throws Exception {
                                        }
                                }
                        )
-               .followedByAny("end").where(new SimpleCondition<Event>() {
-
-                       @Override
-                       public boolean filter(Event value) throws Exception {
-                               return value.getName().equals("end");
-                       }
-               });
+               .followedByAny("end").where(new NameCondition("end"));
 
                DataStream<String> result = CEP.pattern(input, 
pattern).select(new PatternSelectFunction<Event, String>() {
 
@@ -141,6 +133,29 @@ public String select(Map<String, List<Event>> pattern) {
                env.execute();
        }
 
+       private static class NameCondition extends 
RichIterativeCondition<Event> {
+
+               private final String matchName;
+               private boolean isOpen = false;
+
+               private NameCondition(String matchName) {
+                       this.matchName = matchName;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       isOpen = true;
+               }
+
+               @Override
+               public boolean filter(Event value, Context<Event> ctx) throws 
Exception {
+                       if (!isOpen) {
+                               fail("open() method is not called.");
+                       }
+                       return value.getName().equals(matchName);
+               }
+       }
+
        @Test
        public void testSimpleKeyedPatternCEP() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to