This is an automated email from the ASF dual-hosted git repository.
ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 20450485b20 [FLINK-34222][table-planner] Supports mini-batch for
streaming regular join
20450485b20 is described below
commit 20450485b20cb213b96318b0c3275e42c0300e15
Author: yeming <[email protected]>
AuthorDate: Mon Jan 22 10:26:26 2024 +0800
[FLINK-34222][table-planner] Supports mini-batch for streaming regular join
This closes #24161.
---
.../exec/stream/StreamExecChangelogNormalize.java | 8 +--
.../stream/StreamExecGlobalGroupAggregate.java | 7 +--
.../exec/stream/StreamExecGroupAggregate.java | 7 +--
.../StreamExecIncrementalGroupAggregate.java | 3 +-
.../plan/nodes/exec/stream/StreamExecJoin.java | 44 +++++++++----
.../exec/stream/StreamExecLocalGroupAggregate.java | 3 +-
.../table/planner/plan/utils/MinibatchUtil.java | 73 ++++++++++++++++++++++
.../nodes/physical/stream/StreamPhysicalJoin.scala | 3 +-
.../table/planner/plan/utils/AggregateUtil.scala | 10 ---
.../analyze/GroupAggregationAnalyzerTest.xml | 4 +-
.../plan/stream/sql/MiniBatchIntervalInferTest.xml | 2 +-
.../planner/plan/stream/sql/join/JoinTest.xml | 4 +-
.../planner/runtime/stream/sql/JoinITCase.scala | 4 +-
.../planner/runtime/stream/table/JoinITCase.scala | 4 +-
14 files changed, 131 insertions(+), 45 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
index 710eac5eead..29171195e85 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
@@ -39,8 +38,8 @@ import
org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
@@ -145,8 +144,7 @@ public class StreamExecChangelogNormalize extends
ExecNodeBase<RowData>
final long stateRetentionTime =
StateMetadata.getStateTtlForOneInputOperator(config,
stateMetadataList);
- final boolean isMiniBatchEnabled =
-
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+ final boolean isMiniBatchEnabled =
MinibatchUtil.isMiniBatchEnabled(config);
GeneratedRecordEqualiser generatedEqualiser =
new EqualiserCodeGenerator(
@@ -166,7 +164,7 @@ public class StreamExecChangelogNormalize extends
ExecNodeBase<RowData>
true, // generateInsert
false, // inputInsertOnly
generatedEqualiser);
- CountBundleTrigger<RowData> trigger =
AggregateUtil.createMiniBatchTrigger(config);
+ CountBundleTrigger<RowData> trigger =
MinibatchUtil.createMiniBatchTrigger(config);
operator = new KeyedMapBundleOperator<>(processFunction, trigger);
} else {
ProcTimeDeduplicateKeepLastRowFunction processFunction =
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index ed82380a195..37809b9cc92 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
@@ -41,6 +40,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
@@ -258,8 +258,7 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
.generateRecordEqualiser("GroupAggValueEqualiser");
final OneInputStreamOperator<RowData, RowData> operator;
- final boolean isMiniBatchEnabled =
-
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+ final boolean isMiniBatchEnabled =
MinibatchUtil.isMiniBatchEnabled(config);
if (isMiniBatchEnabled) {
MiniBatchGlobalGroupAggFunction aggFunction =
new MiniBatchGlobalGroupAggFunction(
@@ -273,7 +272,7 @@ public class StreamExecGlobalGroupAggregate extends
StreamExecAggregateBase {
operator =
new KeyedMapBundleOperator<>(
- aggFunction,
AggregateUtil.createMiniBatchTrigger(config));
+ aggFunction,
MinibatchUtil.createMiniBatchTrigger(config));
} else {
throw new TableException("Local-Global optimization is only worked
in miniBatch mode");
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
index c31974cd5b6..cdcb725aca9 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
@@ -41,6 +40,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
@@ -224,8 +224,7 @@ public class StreamExecGroupAggregate extends
StreamExecAggregateBase {
aggValueTypes,
planner.getFlinkContext().getClassLoader())
.generateRecordEqualiser("GroupAggValueEqualiser");
final int inputCountIndex = aggInfoList.getIndexOfCountStar();
- final boolean isMiniBatchEnabled =
-
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+ final boolean isMiniBatchEnabled =
MinibatchUtil.isMiniBatchEnabled(config);
final OneInputStreamOperator<RowData, RowData> operator;
if (isMiniBatchEnabled) {
@@ -240,7 +239,7 @@ public class StreamExecGroupAggregate extends
StreamExecAggregateBase {
stateRetentionTime);
operator =
new KeyedMapBundleOperator<>(
- aggFunction,
AggregateUtil.createMiniBatchTrigger(config));
+ aggFunction,
MinibatchUtil.createMiniBatchTrigger(config));
} else {
GroupAggFunction aggFunction =
new GroupAggFunction(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index 9c4522000a5..36d20f764d4 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -38,6 +38,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -242,7 +243,7 @@ public class StreamExecIncrementalGroupAggregate extends
StreamExecAggregateBase
final OneInputStreamOperator<RowData, RowData> operator =
new KeyedMapBundleOperator<>(
- aggFunction,
AggregateUtil.createMiniBatchTrigger(config));
+ aggFunction,
MinibatchUtil.createMiniBatchTrigger(config));
// partitioned aggregation
final OneInputTransformation<RowData, RowData> transform =
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index 68e21b1eebc..dbeb4a5dc98 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -37,10 +37,12 @@ import
org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
+import
org.apache.flink.table.runtime.operators.join.stream.MiniBatchStreamingJoinOperator;
import
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator;
import
org.apache.flink.table.runtime.operators.join.stream.StreamingSemiAntiJoinOperator;
import
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
@@ -195,6 +197,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
AbstractStreamingJoinOperator operator;
FlinkJoinType joinType = joinSpec.getJoinType();
+ final boolean isMiniBatchEnabled =
MinibatchUtil.isMiniBatchEnabled(config);
if (joinType == FlinkJoinType.ANTI || joinType == FlinkJoinType.SEMI) {
operator =
new StreamingSemiAntiJoinOperator(
@@ -211,18 +214,35 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
boolean leftIsOuter = joinType == FlinkJoinType.LEFT || joinType
== FlinkJoinType.FULL;
boolean rightIsOuter =
joinType == FlinkJoinType.RIGHT || joinType ==
FlinkJoinType.FULL;
- operator =
- new StreamingJoinOperator(
- leftTypeInfo,
- rightTypeInfo,
- generatedCondition,
- leftInputSpec,
- rightInputSpec,
- leftIsOuter,
- rightIsOuter,
- joinSpec.getFilterNulls(),
- leftStateRetentionTime,
- rightStateRetentionTime);
+ if (isMiniBatchEnabled) {
+ operator =
+
MiniBatchStreamingJoinOperator.newMiniBatchStreamJoinOperator(
+ joinType,
+ leftTypeInfo,
+ rightTypeInfo,
+ generatedCondition,
+ leftInputSpec,
+ rightInputSpec,
+ leftIsOuter,
+ rightIsOuter,
+ joinSpec.getFilterNulls(),
+ leftStateRetentionTime,
+ rightStateRetentionTime,
+
MinibatchUtil.createMiniBatchCoTrigger(config));
+ } else {
+ operator =
+ new StreamingJoinOperator(
+ leftTypeInfo,
+ rightTypeInfo,
+ generatedCondition,
+ leftInputSpec,
+ rightInputSpec,
+ leftIsOuter,
+ rightIsOuter,
+ joinSpec.getFilterNulls(),
+ leftStateRetentionTime,
+ rightStateRetentionTime);
+ }
}
final RowType returnType = (RowType) getOutputType();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
index 9b6ca388093..c2bc36e087f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
@@ -35,6 +35,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -172,7 +173,7 @@ public class StreamExecLocalGroupAggregate extends
StreamExecAggregateBase {
final MapBundleOperator<RowData, RowData, RowData, RowData> operator =
new MapBundleOperator<>(
- aggFunction,
AggregateUtil.createMiniBatchTrigger(config), selector);
+ aggFunction,
MinibatchUtil.createMiniBatchTrigger(config), selector);
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
new file mode 100644
index 00000000000..d6677a18107
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.bundle.trigger.CoBundleTrigger;
+import
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
+import
org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger;
+
+/** Utility class for mini-batch related config. */
+public class MinibatchUtil {
+
+ /**
+ * Check if MiniBatch is enabled.
+ *
+ * @param config config
+ * @return true if MiniBatch enabled else false.
+ */
+ public static boolean isMiniBatchEnabled(ReadableConfig config) {
+ return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+ }
+
+ /**
+ * Creates a MiniBatch trigger depends on the config for one input.
+ *
+ * @param config config
+ * @return MiniBatch trigger
+ */
+ public static CountBundleTrigger<RowData>
createMiniBatchTrigger(ReadableConfig config) {
+ long size = miniBatchSize(config);
+ return new CountBundleTrigger<>(size);
+ }
+
+ /**
+ * Creates a MiniBatch trigger depends on the config for two input.
+ *
+ * @param config config
+ * @return MiniBatch trigger
+ */
+ public static CoBundleTrigger<RowData, RowData> createMiniBatchCoTrigger(
+ ReadableConfig config) {
+ long size = miniBatchSize(config);
+ return new CountCoBundleTrigger<>(size);
+ }
+
+ /**
+ * Returns the mini batch size for given config and mixed mode flag,
considering fallback logic.
+ *
+ * @param config config
+ * @return mini batch size
+ */
+ public static long miniBatchSize(ReadableConfig config) {
+ return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
index d07dbb5a61e..9de16707e59 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.hint.StateTtlHint
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin
import
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin
-import org.apache.flink.table.planner.plan.utils.JoinUtil
+import org.apache.flink.table.planner.plan.utils.{JoinUtil, MinibatchUtil}
import org.apache.flink.table.planner.utils.ShortcutUtils.{unwrapClassLoader,
unwrapTableConfig}
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
@@ -112,6 +112,7 @@ class StreamPhysicalJoin(
getUpsertKeys(right, joinSpec.getRightKeys)
)
)
+ .itemIf("miniBatch", "true",
MinibatchUtil.isMiniBatchEnabled(unwrapTableConfig(this)))
}
override def computeSelfCost(planner: RelOptPlanner, metadata:
RelMetadataQuery): RelOptCost = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index e9fdb74fa9e..aa16c4128d3 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -1105,16 +1105,6 @@ object AggregateUtil extends Enumeration {
(aggBufferNames ++ distinctBufferNames).toArray
}
- /** Creates a MiniBatch trigger depends on the config. */
- def createMiniBatchTrigger(config: ReadableConfig):
CountBundleTrigger[RowData] = {
- val size = config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE)
- if (size <= 0) {
- throw new IllegalArgumentException(
- ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE + " must be > 0.")
- }
- new CountBundleTrigger[RowData](size)
- }
-
/** Computes the positions of (window start, window end, row time). */
private[flink] def computeWindowPropertyPos(
properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int],
Option[Int]) = {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/GroupAggregationAnalyzerTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/GroupAggregationAnalyzerTest.xml
index f90ee4fbab0..8acd1af1787 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/GroupAggregationAnalyzerTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/GroupAggregationAnalyzerTest.xml
@@ -391,7 +391,7 @@ SELECT * FROM r r1, r r2 WHERE r1.a = CAST(r2.b AS BIGINT)
AND r2.a > 1]]>
<Resource name="optimized rel plan with advice">
<![CDATA[
Calc(select=[c, a, b, c0, a0, b0])
-+- Join(joinType=[InnerJoin], where=[=(a, b00)], select=[c, a, b, c0, a0, b0,
b00], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])
++- Join(joinType=[InnerJoin], where=[=(a, b00)], select=[c, a, b, c0, a0, b0,
b00], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey],
miniBatch=[true])
:- Exchange(distribution=[hash[a]])
: +- GlobalGroupAggregate(groupBy=[c], select=[c, SUM(sum$0) AS a,
SUM(sum$1) AS b])
: +- Exchange(distribution=[hash[c]])
@@ -419,7 +419,7 @@ SELECT * FROM r r1, r r2 WHERE r1.a = CAST(r2.b AS BIGINT)
AND r2.a > 1]]>
<Resource name="optimized rel plan with advice">
<![CDATA[
Calc(select=[c, a, b, c0, a0, b0])
-+- Join(joinType=[InnerJoin], where=[=(a, b00)], select=[c, a, b, c0, a0, b0,
b00], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])
++- Join(joinType=[InnerJoin], where=[=(a, b00)], select=[c, a, b, c0, a0, b0,
b00], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey],
miniBatch=[true])
:- Exchange(distribution=[hash[a]])
: +- GroupAggregate(advice=[1], groupBy=[c], select=[c, SUM(a) AS a,
SUM(b) AS b])
: +- Exchange(distribution=[hash[c]])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 211411cb722..063f9b6637d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -297,7 +297,7 @@ LogicalProject(a=[$0], b=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- Join(joinType=[InnerJoin], where=[(a0 = a1)], select=[a, b, a0, a1],
leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
++- Join(joinType=[InnerJoin], where=[(a0 = a1)], select=[a, b, a0, a1],
leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey],
miniBatch=[true])
:- Exchange(distribution=[hash[a0]])
: +- Calc(select=[a, b, CAST(a AS BIGINT) AS a0])
: +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index 1a4d0b5e002..526804bdedf 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -591,7 +591,7 @@ LogicalSink(table=[default_catalog.default_database.sink],
fields=[a, b, d])
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[a, b, d])
+- Calc(select=[a, b, d])
- +- Join(joinType=[InnerJoin], where=[=(a, c)], select=[a, b, c, d],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])
+ +- Join(joinType=[InnerJoin], where=[=(a, c)], select=[a, b, c, d],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], miniBatch=[true])
:- Exchange(distribution=[hash[a]])
: +- MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog, default_database,
left_table]], fields=[a, b])
@@ -602,7 +602,7 @@ Sink(table=[default_catalog.default_database.sink],
fields=[a, b, d])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink], fields=[a, b, d])
+- Calc(select=[a, b, d])
- +- Join(joinType=[InnerJoin], where=[(a = c)], select=[a, b, c, d],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])
+ +- Join(joinType=[InnerJoin], where=[(a = c)], select=[a, b, c, d],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey], miniBatch=[true])
:- Exchange(distribution=[hash[a]])
: +- MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])
: +- TableSourceScan(table=[[default_catalog, default_database,
left_table]], fields=[a, b])
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 3b25e1e8644..a2d1abd3d67 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -25,6 +25,7 @@ import
org.apache.flink.table.planner.expressions.utils.FuncWithOpen
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils._
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
import org.apache.flink.types.Row
@@ -36,7 +37,8 @@ import org.junit.jupiter.api.extension.ExtendWith
import scala.collection.{mutable, Seq}
@ExtendWith(Array(classOf[ParameterizedTestExtension]))
-class JoinITCase(state: StateBackendMode) extends
StreamingWithStateTestBase(state) {
+class JoinITCase(miniBatch: MiniBatchMode, state: StateBackendMode)
+ extends StreamingWithMiniBatchTestBase(miniBatch, state) {
val smallTuple5Data = List(
(1, 1L, 0, "Hallo", 1L),
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
index 6aa4635971d..630d012b72d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
@@ -26,6 +26,7 @@ import
org.apache.flink.table.api.internal.TableEnvironmentInternal
import org.apache.flink.table.planner.expressions.utils.FuncWithOpen
import org.apache.flink.table.planner.runtime.utils._
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
WeightedAvg}
+import
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
import
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
import org.apache.flink.table.planner.runtime.utils.TestData._
import org.apache.flink.table.planner.utils.CountAggFunction
@@ -39,7 +40,8 @@ import org.junit.jupiter.api.extension.ExtendWith
import java.time.Duration
@ExtendWith(Array(classOf[ParameterizedTestExtension]))
-class JoinITCase(mode: StateBackendMode) extends
StreamingWithStateTestBase(mode) {
+class JoinITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
+ extends StreamingWithMiniBatchTestBase(miniBatch, mode) {
val data2 = List(
(1, 1L, "Hi"),