This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 201571b486f [FLINK-27992] Set ALWAYS chaining strategy for CepOperator in ExecMatch 201571b486f is described below commit 201571b486f405358a31e077247241892d537198 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Jan 17 17:17:37 2024 +0100 [FLINK-27992] Set ALWAYS chaining strategy for CepOperator in ExecMatch --- .../flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java index a5d13b00bf8..8bc4dc4ff06 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java @@ -31,6 +31,7 @@ import org.apache.flink.cep.pattern.Quantifier; import org.apache.flink.cep.pattern.conditions.BooleanConditions; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.TableException; @@ -188,6 +189,9 @@ public abstract class CommonExecMatch extends ExecNodeBase<RowData> transform.setStateKeySelector(selector); transform.setStateKeyType(selector.getProducedType()); + // should be chained with the timestamp inserter + transform.setChainingStrategy(ChainingStrategy.ALWAYS); + if (inputsContainSingleton()) { transform.setParallelism(1); transform.setMaxParallelism(1);