This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new de2d175decc [FLINK-34353][table-planner] Fix unclear exception without 
setting minibatch size when enable minibatch optimization
de2d175decc is described below

commit de2d175decce8defeb7931f449392e47d637e2c4
Author: xuyang <[email protected]>
AuthorDate: Sun Feb 4 18:17:33 2024 +0800

    [FLINK-34353][table-planner] Fix unclear exception without setting 
minibatch size when enable minibatch optimization
    
    This closes #24264
---
 .../table/planner/plan/utils/MinibatchUtil.java    |  7 +++++-
 .../planner/plan/stream/sql/agg/AggregateTest.xml  | 21 +++++++++--------
 .../plan/stream/sql/agg/AggregateTest.scala        | 27 +++++++++++++++++++++-
 .../planner/plan/stream/sql/join/JoinTest.scala    | 25 ++++++++++++++++++++
 4 files changed, 69 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
index d6677a18107..991fac72181 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
@@ -68,6 +68,11 @@ public class MinibatchUtil {
      * @return mini batch size
      */
     public static long miniBatchSize(ReadableConfig config) {
-        return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
+        long size = 
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
+        if (size <= 0) {
+            throw new IllegalArgumentException(
+                    ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE + " must 
be > 0.");
+        }
+        return size;
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 55f72f321ad..4a36ab45186 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -139,18 +139,21 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0) 
AS EXPR$1, COUNT(distinc
     </Resource>
   </TestCase>
   <TestCase name="testAggWithMiniBatch">
-    <Resource name="sql">
-      <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c)  FROM MyTable GROUP 
BY b]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
 LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)], 
EXPR$3=[SUM($2)])
 +- LogicalProject(b=[$1], a=[$0], c=[$2])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, proctime, rowtime)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
+
+== Optimized Physical Plan ==
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS 
EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+   +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS 
count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+      +- Calc(select=[b, a, c])
+         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+            +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c, proctime, 
rowtime)]]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
 GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS 
EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
 +- Exchange(distribution=[hash[b]])
    +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS 
count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index adf082b857f..affe9b33ecd 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -100,7 +100,32 @@ class AggregateTest extends TableTestBase {
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
Boolean.box(true))
     util.tableEnv.getConfig
       .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(1))
-    util.verifyExecPlan("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c)  FROM 
MyTable GROUP BY b")
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(5000L))
+    util.verifyExplain("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM 
MyTable GROUP BY b")
+  }
+
+  @Test
+  def testMiniBatchAggWithNegativeMiniBatchSize(): Unit = {
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
Boolean.box(true))
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(1))
+
+    val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable GROUP 
BY b";
+    // without setting mini-batch size
+    assertThatThrownBy(() => util.verifyExplain(sql))
+      .hasMessage(
+        "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: []) 
must be > 0.")
+      .isInstanceOf[IllegalArgumentException]
+
+    // set negative mini-batch size
+    util.tableEnv.getConfig.getConfiguration
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(-500L))
+    assertThatThrownBy(() => util.verifyExplain(sql))
+      .hasMessage(
+        "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: []) 
must be > 0.")
+      .isInstanceOf[IllegalArgumentException]
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index 4f50c1b3d20..221ecb0edce 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableFunc1, 
TableTestBase}
 
+import org.assertj.core.api.Assertions.assertThatThrownBy
 import org.junit.jupiter.api.Test
 
 import java.time.Duration
@@ -662,4 +663,28 @@ class JoinTest extends TableTestBase {
         |""".stripMargin
     )
   }
+
+  @Test
+  def testMiniBatchJoinWithNegativeMiniBatchSize(): Unit = {
+    util.tableEnv.getConfig.getConfiguration
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
Boolean.box(true))
+    util.tableEnv.getConfig.getConfiguration
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(10))
+
+    val sql = "SELECT * FROM A JOIN B ON a1 = b1"
+
+    // without setting mini-batch size
+    assertThatThrownBy(() => util.verifyExplain(sql))
+      .hasMessage(
+        "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: []) 
must be > 0.")
+      .isInstanceOf[IllegalArgumentException]
+
+    // set negative mini-batch size
+    util.tableEnv.getConfig.getConfiguration
+      .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(-500L))
+    assertThatThrownBy(() => util.verifyExplain(sql))
+      .hasMessage(
+        "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: []) 
must be > 0.")
+      .isInstanceOf[IllegalArgumentException]
+  }
 }

Reply via email to