[ 
https://issues.apache.org/jira/browse/FLINK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692764#comment-16692764
 ] 

ASF GitHub Bot commented on FLINK-8159:
---------------------------------------

dianfu closed pull request #5080: [FLINK-8159] [cep] Add rich support for 
SelectWrapper and FlatSelectWrapper
URL: https://github.com/apache/flink/pull/5080
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
index 4423bb1dd40..95225271b9e 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
@@ -20,7 +20,8 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
@@ -28,6 +29,7 @@
 import org.apache.flink.cep.PatternFlatTimeoutFunction;
 import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.util.OutputTag;
 
@@ -102,7 +104,7 @@ protected void processTimedOutSequences(
         * in one udf.
         */
        @Internal
-       public static class FlatSelectWrapper<IN, OUT1, OUT2> implements 
Function {
+       public static class FlatSelectWrapper<IN, OUT1, OUT2> extends 
AbstractRichFunction {
 
                private static final long serialVersionUID = 
-8320546120157150202L;
 
@@ -125,5 +127,25 @@ public FlatSelectWrapper(
                        this.flatSelectFunction = flatSelectFunction;
                        this.flatTimeoutFunction = flatTimeoutFunction;
                }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       if (flatSelectFunction instanceof RichFunction) {
+                               ((RichFunction) 
flatSelectFunction).open(parameters);
+                       }
+                       if (flatTimeoutFunction instanceof RichFunction) {
+                               ((RichFunction) 
flatTimeoutFunction).open(parameters);
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+                       if (flatSelectFunction instanceof RichFunction) {
+                               ((RichFunction) flatSelectFunction).close();
+                       }
+                       if (flatTimeoutFunction instanceof RichFunction) {
+                               ((RichFunction) flatTimeoutFunction).close();
+                       }
+               }
        }
 }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
index cb233a486ec..18a1454cf4c 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
@@ -19,7 +19,8 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
@@ -27,6 +28,7 @@
 import org.apache.flink.cep.PatternTimeoutFunction;
 import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
 
@@ -54,8 +56,8 @@ public SelectTimeoutCepOperator(
                NFACompiler.NFAFactory<IN> nfaFactory,
                final EventComparator<IN> comparator,
                AfterMatchSkipStrategy skipStrategy,
-               PatternSelectFunction<IN, OUT1> flatSelectFunction,
-               PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction,
+               PatternSelectFunction<IN, OUT1> selectFunction,
+               PatternTimeoutFunction<IN, OUT2> timeoutFunction,
                OutputTag<OUT2> outputTag) {
                super(
                        inputSerializer,
@@ -63,14 +65,14 @@ public SelectTimeoutCepOperator(
                        nfaFactory,
                        comparator,
                        skipStrategy,
-                       new SelectWrapper<>(flatSelectFunction, 
flatTimeoutFunction));
+                       new SelectWrapper<>(selectFunction, timeoutFunction));
                this.timedOutOutputTag = outputTag;
        }
 
        @Override
        protected void processMatchedSequences(Iterable<Map<String, List<IN>>> 
matchingSequences, long timestamp) throws Exception {
                for (Map<String, List<IN>> match : matchingSequences) {
-                       output.collect(new 
StreamRecord<>(getUserFunction().getFlatSelectFunction().select(match), 
timestamp));
+                       output.collect(new 
StreamRecord<>(getUserFunction().getSelectFunction().select(match), timestamp));
                }
        }
 
@@ -80,7 +82,7 @@ protected void processTimedOutSequences(
                for (Tuple2<Map<String, List<IN>>, Long> match : 
timedOutSequences) {
                        output.collect(timedOutOutputTag,
                                new StreamRecord<>(
-                                       
getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1),
+                                       
getUserFunction().getTimeoutFunction().timeout(match.f0, match.f1),
                                        timestamp));
                }
        }
@@ -93,26 +95,46 @@ protected void processTimedOutSequences(
         * @param <OUT2> Type of the timed out output elements
         */
        @Internal
-       public static class SelectWrapper<IN, OUT1, OUT2> implements Function {
+       public static class SelectWrapper<IN, OUT1, OUT2> extends 
AbstractRichFunction {
 
                private static final long serialVersionUID = 
-8320546120157150202L;
 
-               private PatternSelectFunction<IN, OUT1> flatSelectFunction;
-               private PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction;
+               private PatternSelectFunction<IN, OUT1> selectFunction;
+               private PatternTimeoutFunction<IN, OUT2> timeoutFunction;
 
-               PatternSelectFunction<IN, OUT1> getFlatSelectFunction() {
-                       return flatSelectFunction;
+               PatternSelectFunction<IN, OUT1> getSelectFunction() {
+                       return selectFunction;
                }
 
-               PatternTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() {
-                       return flatTimeoutFunction;
+               PatternTimeoutFunction<IN, OUT2> getTimeoutFunction() {
+                       return timeoutFunction;
                }
 
                public SelectWrapper(
-                       PatternSelectFunction<IN, OUT1> flatSelectFunction,
-                       PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction) {
-                       this.flatSelectFunction = flatSelectFunction;
-                       this.flatTimeoutFunction = flatTimeoutFunction;
+                       PatternSelectFunction<IN, OUT1> selectFunction,
+                       PatternTimeoutFunction<IN, OUT2> timeoutFunction) {
+                       this.selectFunction = selectFunction;
+                       this.timeoutFunction = timeoutFunction;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       if (selectFunction instanceof RichFunction) {
+                               ((RichFunction) 
selectFunction).open(parameters);
+                       }
+                       if (timeoutFunction instanceof RichFunction) {
+                               ((RichFunction) 
timeoutFunction).open(parameters);
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+                       if (selectFunction instanceof RichFunction) {
+                               ((RichFunction) selectFunction).close();
+                       }
+                       if (timeoutFunction instanceof RichFunction) {
+                               ((RichFunction) timeoutFunction).close();
+                       }
                }
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Pattern(Flat)SelectFunctions should support RichFunction interface
> ------------------------------------------------------------------
>
>                 Key: FLINK-8159
>                 URL: https://issues.apache.org/jira/browse/FLINK-8159
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> {{SelectWrapper}} and {{FlatSelectWrapper}} should extends 
> {{AbstractRichFucntion}} and process properly if the underlying functions 
> extend RichFunction.
> Things to be very careful about:
> * backwards compatibility (we previously serialized conditions) - changes to 
> those interfaces have to be done carefully
> * we want to be able to add dynamic patterns in the future, so at some point 
> we have to open also on control message arrival



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to