[
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16139638#comment-16139638
]
Dawid Wysakowicz commented on FLINK-7129:
-----------------------------------------
That was my initial plan to use the {{connect}} as you described, but
unfortunately the key is not propagated then, and we cannot use a
{{KeyedStateBackend}} in the {{operator}}.
Recently I gave it another go and I managed to somehow glue it, but it would
require some changes {{flink-streaming-java}} module (as the ctor of
{{SingleOutputtStreamOperator}} is protected). Also it is based on the
assumption that the {{KeySelector}} of the second {{inputStream}} is only used
when accessing {{KeyedState}} in {{processElement2}}. I would love to hear
opinions on that approach from [~kkl0u] and [~aljoscha].
This is the interesting part of creating `CoCepOperator`:
{code}
KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream;
TypeSerializer<K> keySerializer =
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
TwoInputTransformation<T, Pattern<T, ?>, Map<String, List<T>>> transform = new
TwoInputTransformation<>(
keyedStream.getTransformation(),
dynamicPatternsStream.broadcast().getTransformation(),
"KeyedCEPCoPatternOperator",
new CoCepPatternOperator<>(
inputSerializer,
isProcessingTime,
keySerializer,
nfaFactory,
true),
(TypeInformation<Map<String, List<T>>>) (TypeInformation<?>)
TypeExtractor.getForClass(Map.class),
keyedStream.getExecutionEnvironment().getParallelism());
transform.setStateKeySelectors(keyedStream.getKeySelector(),null);
transform.setStateKeyType(keyedStream.getKeyType());
patternStream = new
SingleOutputStreamOperator(keyedStream.getExecutionEnvironment(), transform);
keyedStream.getExecutionEnvironment().addOperator(transform);
{code}
I have some WIP dynamic cep patterns in my branch:
https://github.com/dawidwys/flink/tree/cep-dynamic-nfa
> Dynamically changing patterns
> -----------------------------
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
> Issue Type: New Feature
> Components: CEP
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through
> coStream
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)