This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 201571b486f [FLINK-27992] Set ALWAYS chaining strategy for CepOperator
in ExecMatch
201571b486f is described below
commit 201571b486f405358a31e077247241892d537198
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Wed Jan 17 17:17:37 2024 +0100
[FLINK-27992] Set ALWAYS chaining strategy for CepOperator in ExecMatch
---
.../flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
index a5d13b00bf8..8bc4dc4ff06 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
@@ -31,6 +31,7 @@ import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.TableException;
@@ -188,6 +189,9 @@ public abstract class CommonExecMatch extends
ExecNodeBase<RowData>
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());
+ // should be chained with the timestamp inserter
+ transform.setChainingStrategy(ChainingStrategy.ALWAYS);
+
if (inputsContainSingleton()) {
transform.setParallelism(1);
transform.setMaxParallelism(1);