This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 20450485b20 [FLINK-34222][table-planner] Supports mini-batch for 
streaming regular join
20450485b20 is described below

commit 20450485b20cb213b96318b0c3275e42c0300e15
Author: yeming <[email protected]>
AuthorDate: Mon Jan 22 10:26:26 2024 +0800

    [FLINK-34222][table-planner] Supports mini-batch for streaming regular join
    
    This closes #24161.
---
 .../exec/stream/StreamExecChangelogNormalize.java  |  8 +--
 .../stream/StreamExecGlobalGroupAggregate.java     |  7 +--
 .../exec/stream/StreamExecGroupAggregate.java      |  7 +--
 .../StreamExecIncrementalGroupAggregate.java       |  3 +-
 .../plan/nodes/exec/stream/StreamExecJoin.java     | 44 +++++++++----
 .../exec/stream/StreamExecLocalGroupAggregate.java |  3 +-
 .../table/planner/plan/utils/MinibatchUtil.java    | 73 ++++++++++++++++++++++
 .../nodes/physical/stream/StreamPhysicalJoin.scala |  3 +-
 .../table/planner/plan/utils/AggregateUtil.scala   | 10 ---
 .../analyze/GroupAggregationAnalyzerTest.xml       |  4 +-
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml |  2 +-
 .../planner/plan/stream/sql/join/JoinTest.xml      |  4 +-
 .../planner/runtime/stream/sql/JoinITCase.scala    |  4 +-
 .../planner/runtime/stream/table/JoinITCase.scala  |  4 +-
 14 files changed, 131 insertions(+), 45 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
index 710eac5eead..29171195e85 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
@@ -39,8 +38,8 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
@@ -145,8 +144,7 @@ public class StreamExecChangelogNormalize extends 
ExecNodeBase<RowData>
 
         final long stateRetentionTime =
                 StateMetadata.getStateTtlForOneInputOperator(config, 
stateMetadataList);
-        final boolean isMiniBatchEnabled =
-                
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+        final boolean isMiniBatchEnabled = 
MinibatchUtil.isMiniBatchEnabled(config);
 
         GeneratedRecordEqualiser generatedEqualiser =
                 new EqualiserCodeGenerator(
@@ -166,7 +164,7 @@ public class StreamExecChangelogNormalize extends 
ExecNodeBase<RowData>
                             true, // generateInsert
                             false, // inputInsertOnly
                             generatedEqualiser);
-            CountBundleTrigger<RowData> trigger = 
AggregateUtil.createMiniBatchTrigger(config);
+            CountBundleTrigger<RowData> trigger = 
MinibatchUtil.createMiniBatchTrigger(config);
             operator = new KeyedMapBundleOperator<>(processFunction, trigger);
         } else {
             ProcTimeDeduplicateKeepLastRowFunction processFunction =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index ed82380a195..37809b9cc92 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
@@ -41,6 +40,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
@@ -258,8 +258,7 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
                         .generateRecordEqualiser("GroupAggValueEqualiser");
 
         final OneInputStreamOperator<RowData, RowData> operator;
-        final boolean isMiniBatchEnabled =
-                
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+        final boolean isMiniBatchEnabled = 
MinibatchUtil.isMiniBatchEnabled(config);
         if (isMiniBatchEnabled) {
             MiniBatchGlobalGroupAggFunction aggFunction =
                     new MiniBatchGlobalGroupAggFunction(
@@ -273,7 +272,7 @@ public class StreamExecGlobalGroupAggregate extends 
StreamExecAggregateBase {
 
             operator =
                     new KeyedMapBundleOperator<>(
-                            aggFunction, 
AggregateUtil.createMiniBatchTrigger(config));
+                            aggFunction, 
MinibatchUtil.createMiniBatchTrigger(config));
         } else {
             throw new TableException("Local-Global optimization is only worked 
in miniBatch mode");
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
index c31974cd5b6..cdcb725aca9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
@@ -41,6 +40,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
@@ -224,8 +224,7 @@ public class StreamExecGroupAggregate extends 
StreamExecAggregateBase {
                                 aggValueTypes, 
planner.getFlinkContext().getClassLoader())
                         .generateRecordEqualiser("GroupAggValueEqualiser");
         final int inputCountIndex = aggInfoList.getIndexOfCountStar();
-        final boolean isMiniBatchEnabled =
-                
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+        final boolean isMiniBatchEnabled = 
MinibatchUtil.isMiniBatchEnabled(config);
 
         final OneInputStreamOperator<RowData, RowData> operator;
         if (isMiniBatchEnabled) {
@@ -240,7 +239,7 @@ public class StreamExecGroupAggregate extends 
StreamExecAggregateBase {
                             stateRetentionTime);
             operator =
                     new KeyedMapBundleOperator<>(
-                            aggFunction, 
AggregateUtil.createMiniBatchTrigger(config));
+                            aggFunction, 
MinibatchUtil.createMiniBatchTrigger(config));
         } else {
             GroupAggFunction aggFunction =
                     new GroupAggFunction(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index 9c4522000a5..36d20f764d4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -242,7 +243,7 @@ public class StreamExecIncrementalGroupAggregate extends 
StreamExecAggregateBase
 
         final OneInputStreamOperator<RowData, RowData> operator =
                 new KeyedMapBundleOperator<>(
-                        aggFunction, 
AggregateUtil.createMiniBatchTrigger(config));
+                        aggFunction, 
MinibatchUtil.createMiniBatchTrigger(config));
 
         // partitioned aggregation
         final OneInputTransformation<RowData, RowData> transform =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index 68e21b1eebc..dbeb4a5dc98 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -37,10 +37,12 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.JoinUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import 
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoinOperator;
+import 
org.apache.flink.table.runtime.operators.join.stream.MiniBatchStreamingJoinOperator;
 import 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator;
 import 
org.apache.flink.table.runtime.operators.join.stream.StreamingSemiAntiJoinOperator;
 import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
@@ -195,6 +197,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
 
         AbstractStreamingJoinOperator operator;
         FlinkJoinType joinType = joinSpec.getJoinType();
+        final boolean isMiniBatchEnabled = 
MinibatchUtil.isMiniBatchEnabled(config);
         if (joinType == FlinkJoinType.ANTI || joinType == FlinkJoinType.SEMI) {
             operator =
                     new StreamingSemiAntiJoinOperator(
@@ -211,18 +214,35 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
             boolean leftIsOuter = joinType == FlinkJoinType.LEFT || joinType 
== FlinkJoinType.FULL;
             boolean rightIsOuter =
                     joinType == FlinkJoinType.RIGHT || joinType == 
FlinkJoinType.FULL;
-            operator =
-                    new StreamingJoinOperator(
-                            leftTypeInfo,
-                            rightTypeInfo,
-                            generatedCondition,
-                            leftInputSpec,
-                            rightInputSpec,
-                            leftIsOuter,
-                            rightIsOuter,
-                            joinSpec.getFilterNulls(),
-                            leftStateRetentionTime,
-                            rightStateRetentionTime);
+            if (isMiniBatchEnabled) {
+                operator =
+                        
MiniBatchStreamingJoinOperator.newMiniBatchStreamJoinOperator(
+                                joinType,
+                                leftTypeInfo,
+                                rightTypeInfo,
+                                generatedCondition,
+                                leftInputSpec,
+                                rightInputSpec,
+                                leftIsOuter,
+                                rightIsOuter,
+                                joinSpec.getFilterNulls(),
+                                leftStateRetentionTime,
+                                rightStateRetentionTime,
+                                
MinibatchUtil.createMiniBatchCoTrigger(config));
+            } else {
+                operator =
+                        new StreamingJoinOperator(
+                                leftTypeInfo,
+                                rightTypeInfo,
+                                generatedCondition,
+                                leftInputSpec,
+                                rightInputSpec,
+                                leftIsOuter,
+                                rightIsOuter,
+                                joinSpec.getFilterNulls(),
+                                leftStateRetentionTime,
+                                rightStateRetentionTime);
+            }
         }
 
         final RowType returnType = (RowType) getOutputType();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
index 9b6ca388093..c2bc36e087f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.MinibatchUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -172,7 +173,7 @@ public class StreamExecLocalGroupAggregate extends 
StreamExecAggregateBase {
 
         final MapBundleOperator<RowData, RowData, RowData, RowData> operator =
                 new MapBundleOperator<>(
-                        aggFunction, 
AggregateUtil.createMiniBatchTrigger(config), selector);
+                        aggFunction, 
MinibatchUtil.createMiniBatchTrigger(config), selector);
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
new file mode 100644
index 00000000000..d6677a18107
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.bundle.trigger.CoBundleTrigger;
+import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
+import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger;
+
+/** Utility class for mini-batch related config. */
+public class MinibatchUtil {
+
+    /**
+     * Check if MiniBatch is enabled.
+     *
+     * @param config config
+     * @return true if MiniBatch enabled else false.
+     */
+    public static boolean isMiniBatchEnabled(ReadableConfig config) {
+        return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
+    }
+
+    /**
+     * Creates a MiniBatch trigger depends on the config for one input.
+     *
+     * @param config config
+     * @return MiniBatch trigger
+     */
+    public static CountBundleTrigger<RowData> 
createMiniBatchTrigger(ReadableConfig config) {
+        long size = miniBatchSize(config);
+        return new CountBundleTrigger<>(size);
+    }
+
+    /**
+     * Creates a MiniBatch trigger depends on the config for two input.
+     *
+     * @param config config
+     * @return MiniBatch trigger
+     */
+    public static CoBundleTrigger<RowData, RowData> createMiniBatchCoTrigger(
+            ReadableConfig config) {
+        long size = miniBatchSize(config);
+        return new CountCoBundleTrigger<>(size);
+    }
+
+    /**
+     * Returns the mini batch size for given config and mixed mode flag, 
considering fallback logic.
+     *
+     * @param config config
+     * @return mini batch size
+     */
+    public static long miniBatchSize(ReadableConfig config) {
+        return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
index d07dbb5a61e..9de16707e59 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.planner.hint.StateTtlHint
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalJoin
-import org.apache.flink.table.planner.plan.utils.JoinUtil
+import org.apache.flink.table.planner.plan.utils.{JoinUtil, MinibatchUtil}
 import org.apache.flink.table.planner.utils.ShortcutUtils.{unwrapClassLoader, 
unwrapTableConfig}
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 
@@ -112,6 +112,7 @@ class StreamPhysicalJoin(
           getUpsertKeys(right, joinSpec.getRightKeys)
         )
       )
+      .itemIf("miniBatch", "true", 
MinibatchUtil.isMiniBatchEnabled(unwrapTableConfig(this)))
   }
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index e9fdb74fa9e..aa16c4128d3 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -1105,16 +1105,6 @@ object AggregateUtil extends Enumeration {
     (aggBufferNames ++ distinctBufferNames).toArray
   }
 
-  /** Creates a MiniBatch trigger depends on the config. */
-  def createMiniBatchTrigger(config: ReadableConfig): 
CountBundleTrigger[RowData] = {
-    val size = config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE)
-    if (size <= 0) {
-      throw new IllegalArgumentException(
-        ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE + " must be > 0.")
-    }
-    new CountBundleTrigger[RowData](size)
-  }
-
   /** Computes the positions of (window start, window end, row time). */
   private[flink] def computeWindowPropertyPos(
       properties: Seq[NamedWindowProperty]): (Option[Int], Option[Int], 
Option[Int]) = {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/GroupAggregationAnalyzerTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/GroupAggregationAnalyzerTest.xml
index f90ee4fbab0..8acd1af1787 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/GroupAggregationAnalyzerTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/GroupAggregationAnalyzerTest.xml
@@ -391,7 +391,7 @@ SELECT * FROM r r1, r r2 WHERE r1.a = CAST(r2.b AS BIGINT) 
AND r2.a > 1]]>
     <Resource name="optimized rel plan with advice">
       <![CDATA[
 Calc(select=[c, a, b, c0, a0, b0])
-+- Join(joinType=[InnerJoin], where=[=(a, b00)], select=[c, a, b, c0, a0, b0, 
b00], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])
++- Join(joinType=[InnerJoin], where=[=(a, b00)], select=[c, a, b, c0, a0, b0, 
b00], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey], 
miniBatch=[true])
    :- Exchange(distribution=[hash[a]])
    :  +- GlobalGroupAggregate(groupBy=[c], select=[c, SUM(sum$0) AS a, 
SUM(sum$1) AS b])
    :     +- Exchange(distribution=[hash[c]])
@@ -419,7 +419,7 @@ SELECT * FROM r r1, r r2 WHERE r1.a = CAST(r2.b AS BIGINT) 
AND r2.a > 1]]>
     <Resource name="optimized rel plan with advice">
       <![CDATA[
 Calc(select=[c, a, b, c0, a0, b0])
-+- Join(joinType=[InnerJoin], where=[=(a, b00)], select=[c, a, b, c0, a0, b0, 
b00], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey])
++- Join(joinType=[InnerJoin], where=[=(a, b00)], select=[c, a, b, c0, a0, b0, 
b00], leftInputSpec=[HasUniqueKey], rightInputSpec=[HasUniqueKey], 
miniBatch=[true])
    :- Exchange(distribution=[hash[a]])
    :  +- GroupAggregate(advice=[1], groupBy=[c], select=[c, SUM(a) AS a, 
SUM(b) AS b])
    :     +- Exchange(distribution=[hash[c]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 211411cb722..063f9b6637d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -297,7 +297,7 @@ LogicalProject(a=[$0], b=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- Join(joinType=[InnerJoin], where=[(a0 = a1)], select=[a, b, a0, a1], 
leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
++- Join(joinType=[InnerJoin], where=[(a0 = a1)], select=[a, b, a0, a1], 
leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey], 
miniBatch=[true])
    :- Exchange(distribution=[hash[a0]])
    :  +- Calc(select=[a, b, CAST(a AS BIGINT) AS a0])
    :     +- GlobalGroupAggregate(groupBy=[a], select=[a, COUNT(count$0) AS b])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index 1a4d0b5e002..526804bdedf 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -591,7 +591,7 @@ LogicalSink(table=[default_catalog.default_database.sink], 
fields=[a, b, d])
 == Optimized Physical Plan ==
 Sink(table=[default_catalog.default_database.sink], fields=[a, b, d])
 +- Calc(select=[a, b, d])
-   +- Join(joinType=[InnerJoin], where=[=(a, c)], select=[a, b, c, d], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
+   +- Join(joinType=[InnerJoin], where=[=(a, c)], select=[a, b, c, d], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], miniBatch=[true])
       :- Exchange(distribution=[hash[a]])
       :  +- MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])
       :     +- TableSourceScan(table=[[default_catalog, default_database, 
left_table]], fields=[a, b])
@@ -602,7 +602,7 @@ Sink(table=[default_catalog.default_database.sink], 
fields=[a, b, d])
 == Optimized Execution Plan ==
 Sink(table=[default_catalog.default_database.sink], fields=[a, b, d])
 +- Calc(select=[a, b, d])
-   +- Join(joinType=[InnerJoin], where=[(a = c)], select=[a, b, c, d], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])
+   +- Join(joinType=[InnerJoin], where=[(a = c)], select=[a, b, c, d], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], miniBatch=[true])
       :- Exchange(distribution=[hash[a]])
       :  +- MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])
       :     +- TableSourceScan(table=[[default_catalog, default_database, 
left_table]], fields=[a, b])
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 3b25e1e8644..a2d1abd3d67 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -25,6 +25,7 @@ import 
org.apache.flink.table.planner.expressions.utils.FuncWithOpen
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension
 import org.apache.flink.types.Row
@@ -36,7 +37,8 @@ import org.junit.jupiter.api.extension.ExtendWith
 import scala.collection.{mutable, Seq}
 
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
-class JoinITCase(state: StateBackendMode) extends 
StreamingWithStateTestBase(state) {
+class JoinITCase(miniBatch: MiniBatchMode, state: StateBackendMode)
+  extends StreamingWithMiniBatchTestBase(miniBatch, state) {
 
   val smallTuple5Data = List(
     (1, 1L, 0, "Hallo", 1L),
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
index 6aa4635971d..630d012b72d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.planner.expressions.utils.FuncWithOpen
 import org.apache.flink.table.planner.runtime.utils._
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct,
 WeightedAvg}
+import 
org.apache.flink.table.planner.runtime.utils.StreamingWithMiniBatchTestBase.MiniBatchMode
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.planner.utils.CountAggFunction
@@ -39,7 +40,8 @@ import org.junit.jupiter.api.extension.ExtendWith
 import java.time.Duration
 
 @ExtendWith(Array(classOf[ParameterizedTestExtension]))
-class JoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase(mode) {
+class JoinITCase(miniBatch: MiniBatchMode, mode: StateBackendMode)
+  extends StreamingWithMiniBatchTestBase(miniBatch, mode) {
 
   val data2 = List(
     (1, 1L, "Hi"),

Reply via email to