[ 
https://issues.apache.org/jira/browse/FLINK-12297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16824899#comment-16824899
 ] 

aitozi commented on FLINK-12297:
--------------------------------

Hi, [~SleePy][~dawidwys]
As I mentioned in 
[FLINK-12113|https://issues.apache.org/jira/browse/FLINK-12113] .  _Can we 
enable the recursive clean in ClosureCleaner#clean by check the class fields 
recursively. In this way we don't have to take care of all the wrapper 
functions, we just have to clean up to all userfunction register entrance._

> Clean the closure for OutputTags in PatternStream
> -------------------------------------------------
>
>                 Key: FLINK-12297
>                 URL: https://issues.apache.org/jira/browse/FLINK-12297
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.8.0
>            Reporter: Dawid Wysakowicz
>            Priority: Blocker
>             Fix For: 1.9.0, 1.8.1
>
>
> Right now we do not invoke closure cleaner on output tags. Therefore such 
> code:
> {code}
>       @Test
>       public void testFlatSelectSerialization() throws Exception {
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               DataStreamSource<Integer> elements = env.fromElements(1, 2, 3);
>               OutputTag<Integer> outputTag = new OutputTag<Integer>("AAA") {};
>               CEP.pattern(elements, Pattern.begin("A")).flatSelect(
>                       outputTag,
>                       new PatternFlatTimeoutFunction<Integer, Integer>() {
>                               @Override
>                               public void timeout(
>                                       Map<String, List<Integer>> pattern,
>                                       long timeoutTimestamp,
>                                       Collector<Integer> out) throws 
> Exception {
>                               }
>                       },
>                       new PatternFlatSelectFunction<Integer, Object>() {
>                               @Override
>                               public void flatSelect(Map<String, 
> List<Integer>> pattern, Collector<Object> out) throws Exception {
>                               }
>                       }
>               );
>               env.execute();
>       }
> {code}
> will fail with {{The implementation of the PatternFlatSelectAdapter is not 
> serializable. }} exception



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to