This is an automated email from the ASF dual-hosted git repository.
ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new de2d175decc [FLINK-34353][table-planner] Fix unclear exception without
setting minibatch size when enable minibatch optimization
de2d175decc is described below
commit de2d175decce8defeb7931f449392e47d637e2c4
Author: xuyang <[email protected]>
AuthorDate: Sun Feb 4 18:17:33 2024 +0800
[FLINK-34353][table-planner] Fix unclear exception without setting
minibatch size when enable minibatch optimization
This closes #24264
---
.../table/planner/plan/utils/MinibatchUtil.java | 7 +++++-
.../planner/plan/stream/sql/agg/AggregateTest.xml | 21 +++++++++--------
.../plan/stream/sql/agg/AggregateTest.scala | 27 +++++++++++++++++++++-
.../planner/plan/stream/sql/join/JoinTest.scala | 25 ++++++++++++++++++++
4 files changed, 69 insertions(+), 11 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
index d6677a18107..991fac72181 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MinibatchUtil.java
@@ -68,6 +68,11 @@ public class MinibatchUtil {
* @return mini batch size
*/
public static long miniBatchSize(ReadableConfig config) {
- return config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
+ long size =
config.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE);
+ if (size <= 0) {
+ throw new IllegalArgumentException(
+ ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE + " must
be > 0.");
+ }
+ return size;
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index 55f72f321ad..4a36ab45186 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -139,18 +139,21 @@ GlobalGroupAggregate(groupBy=[a], select=[a, SUM(sum$0)
AS EXPR$1, COUNT(distinc
</Resource>
</TestCase>
<TestCase name="testAggWithMiniBatch">
- <Resource name="sql">
- <![CDATA[SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable GROUP
BY b]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[MAX($0)],
EXPR$3=[SUM($2)])
+- LogicalProject(b=[$1], a=[$0], c=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c, proctime, rowtime)]]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
+
+== Optimized Physical Plan ==
+GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS
EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
++- Exchange(distribution=[hash[b]])
+ +- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS
count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
+ +- Calc(select=[b, a, c])
+ +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
+ +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c, proctime,
rowtime)]]], fields=[a, b, c, proctime, rowtime])
+
+== Optimized Execution Plan ==
GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS
EXPR$1, MAX(max$1) AS EXPR$2, SUM(sum$2) AS EXPR$3])
+- Exchange(distribution=[hash[b]])
+- LocalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 a) AS
count$0, MAX(b) AS max$1, SUM(c) AS sum$2, DISTINCT(a) AS distinct$0])
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index adf082b857f..affe9b33ecd 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -100,7 +100,32 @@ class AggregateTest extends TableTestBase {
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
Boolean.box(true))
util.tableEnv.getConfig
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(1))
- util.verifyExecPlan("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM
MyTable GROUP BY b")
+ util.tableEnv.getConfig
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(5000L))
+ util.verifyExplain("SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM
MyTable GROUP BY b")
+ }
+
+ @Test
+ def testMiniBatchAggWithNegativeMiniBatchSize(): Unit = {
+ util.tableEnv.getConfig
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
Boolean.box(true))
+ util.tableEnv.getConfig
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(1))
+
+ val sql = "SELECT b, COUNT(DISTINCT a), MAX(b), SUM(c) FROM MyTable GROUP
BY b";
+ // without setting mini-batch size
+ assertThatThrownBy(() => util.verifyExplain(sql))
+ .hasMessage(
+ "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: [])
must be > 0.")
+ .isInstanceOf[IllegalArgumentException]
+
+ // set negative mini-batch size
+ util.tableEnv.getConfig.getConfiguration
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(-500L))
+ assertThatThrownBy(() => util.verifyExplain(sql))
+ .hasMessage(
+ "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: [])
must be > 0.")
+ .isInstanceOf[IllegalArgumentException]
}
@Test
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index 4f50c1b3d20..221ecb0edce 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.api._
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableFunc1,
TableTestBase}
+import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test
import java.time.Duration
@@ -662,4 +663,28 @@ class JoinTest extends TableTestBase {
|""".stripMargin
)
}
+
+ @Test
+ def testMiniBatchJoinWithNegativeMiniBatchSize(): Unit = {
+ util.tableEnv.getConfig.getConfiguration
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
Boolean.box(true))
+ util.tableEnv.getConfig.getConfiguration
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(10))
+
+ val sql = "SELECT * FROM A JOIN B ON a1 = b1"
+
+ // without setting mini-batch size
+ assertThatThrownBy(() => util.verifyExplain(sql))
+ .hasMessage(
+ "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: [])
must be > 0.")
+ .isInstanceOf[IllegalArgumentException]
+
+ // set negative mini-batch size
+ util.tableEnv.getConfig.getConfiguration
+ .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(-500L))
+ assertThatThrownBy(() => util.verifyExplain(sql))
+ .hasMessage(
+ "Key: 'table.exec.mini-batch.size' , default: -1 (fallback keys: [])
must be > 0.")
+ .isInstanceOf[IllegalArgumentException]
+ }
}