This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 201571b486f [FLINK-27992] Set ALWAYS chaining strategy for CepOperator 
in ExecMatch
201571b486f is described below

commit 201571b486f405358a31e077247241892d537198
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Jan 17 17:17:37 2024 +0100

    [FLINK-27992] Set ALWAYS chaining strategy for CepOperator in ExecMatch
---
 .../flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java   | 4 ++++
 1 file changed, 4 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
index a5d13b00bf8..8bc4dc4ff06 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
@@ -31,6 +31,7 @@ import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.table.api.TableException;
@@ -188,6 +189,9 @@ public abstract class CommonExecMatch extends 
ExecNodeBase<RowData>
         transform.setStateKeySelector(selector);
         transform.setStateKeyType(selector.getProducedType());
 
+        // should be chained with the timestamp inserter
+        transform.setChainingStrategy(ChainingStrategy.ALWAYS);
+
         if (inputsContainSingleton()) {
             transform.setParallelism(1);
             transform.setMaxParallelism(1);

Reply via email to