[
https://issues.apache.org/jira/browse/FLINK-5845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878218#comment-15878218
]
ASF GitHub Bot commented on FLINK-5845:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/3375#discussion_r102462053
--- Diff:
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
---
@@ -127,16 +134,36 @@
keySerializer,
nfaFactory));
} else {
- patternStream = inputStream.transform(
+
+ KeySelector<T, Byte> keySelector = new
NullByteKeySelector<>();
+ TypeSerializer<Byte> keySerializer =
ByteSerializer.INSTANCE;
+
+ patternStream = inputStream.keyBy(new
NullByteKeySelector<T>()).transform(
"TimeoutCEPPatternOperator",
eitherTypeInformation,
- new TimeoutCEPPatternOperator<>(
+ new TimeoutKeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
+ keySelector,
+ keySerializer,
nfaFactory
)).forceNonParallel();
}
return patternStream;
}
+
+ /**
+ * Used as dummy KeySelector to allow using WindowOperator for
Non-Keyed Windows.
--- End diff --
Does not seem to fit here. Copy & paste artifact?
> CEP: unify key and non-keyed operators
> --------------------------------------
>
> Key: FLINK-5845
> URL: https://issues.apache.org/jira/browse/FLINK-5845
> Project: Flink
> Issue Type: Sub-task
> Components: CEP
> Affects Versions: 1.3.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the keyed and non-keyed operators in the CEP library have different
> implementations. This issue targets to unify them into one.
> This new implementation will always be applied on a keyed stream, and in the
> case of non-keyed usecases, the input stream will be keyed on a dummy key, as
> done in the case of the {{DataStream.windowAll()}} method, where the input
> stream is keyed using the {{NullByteKeySelector}}.
> This is a first step towards making the CEP operators rescalable.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)