Repository: flink Updated Branches: refs/heads/master 2c20b39ff -> d443d6b02
[hotfix] Force Non-Parallel for Non-Keyed CEP Operators Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d443d6b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d443d6b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d443d6b0 Branch: refs/heads/master Commit: d443d6b02fbda713077a6e6b5781de5312349d3d Parents: 2c20b39 Author: Aljoscha Krettek <[email protected]> Authored: Mon Oct 24 10:29:28 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Oct 24 10:29:28 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/cep/operator/CEPOperatorUtils.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d443d6b0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index 548be16..39e2ccd 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -75,11 +75,11 @@ public class CEPOperatorUtils { patternStream = inputStream.transform( "CEPPatternOperator", (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class), - new CEPPatternOperator<T>( + new CEPPatternOperator<>( inputSerializer, isProcessingTime, nfaFactory - )).setParallelism(1); + )).forceNonParallel(); } return patternStream; @@ -130,11 +130,11 @@ public class CEPOperatorUtils { patternStream = inputStream.transform( "TimeoutCEPPatternOperator", eitherTypeInformation, - new TimeoutCEPPatternOperator<T>( + new TimeoutCEPPatternOperator<>( inputSerializer, isProcessingTime, nfaFactory - )).setParallelism(1); + )).forceNonParallel(); } return patternStream;
