[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16139638#comment-16139638
 ] 

Dawid Wysakowicz commented on FLINK-7129:
-----------------------------------------

That was my initial plan to use the {{connect}} as you described, but 
unfortunately the key is not propagated then, and we cannot use a 
{{KeyedStateBackend}} in the {{operator}}.

Recently I gave it another go and I managed to somehow glue it, but it would 
require some changes {{flink-streaming-java}} module (as the ctor of 
{{SingleOutputtStreamOperator}} is protected). Also it is based on the 
assumption that the {{KeySelector}} of the second {{inputStream}} is only used 
when accessing {{KeyedState}} in {{processElement2}}. I would love to hear 
opinions on that approach from [~kkl0u] and [~aljoscha].

This is the interesting part of creating `CoCepOperator`:

{code}
KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream;

TypeSerializer<K> keySerializer = 
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());

TwoInputTransformation<T, Pattern<T, ?>, Map<String, List<T>>> transform = new 
TwoInputTransformation<>(
        keyedStream.getTransformation(),
        dynamicPatternsStream.broadcast().getTransformation(),
        "KeyedCEPCoPatternOperator",
        new CoCepPatternOperator<>(
                inputSerializer,
                isProcessingTime,
                keySerializer,
                nfaFactory,
                true),
        (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) 
TypeExtractor.getForClass(Map.class),
        keyedStream.getExecutionEnvironment().getParallelism());

transform.setStateKeySelectors(keyedStream.getKeySelector(),null);
transform.setStateKeyType(keyedStream.getKeyType());

patternStream = new 
SingleOutputStreamOperator(keyedStream.getExecutionEnvironment(), transform);

keyedStream.getExecutionEnvironment().addOperator(transform);
{code}

I have some WIP dynamic cep patterns in my branch: 
https://github.com/dawidwys/flink/tree/cep-dynamic-nfa

> Dynamically changing patterns
> -----------------------------
>
>                 Key: FLINK-7129
>                 URL: https://issues.apache.org/jira/browse/FLINK-7129
>             Project: Flink
>          Issue Type: New Feature
>          Components: CEP
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to