This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 68e0ee8 [FLINK-19739][table-runtime-blink] Fix compile exception for
window aggregate in batch jobs
68e0ee8 is described below
commit 68e0ee84e908c7ed64a9f8ae2dbf6823e4106d71
Author: tsreaper <[email protected]>
AuthorDate: Wed Jul 28 19:25:21 2021 +0800
[FLINK-19739][table-runtime-blink] Fix compile exception for window
aggregate in batch jobs
This closes #16617
---
.../agg/batch/HashWindowCodeGenerator.scala | 11 ++++++-
.../codegen/agg/batch/WindowCodeGenerator.scala | 12 ++-----
.../runtime/batch/sql/agg/GroupWindowITCase.scala | 37 ++++++++++++++++++++++
3 files changed, 49 insertions(+), 11 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
index 864e71b..cee7229 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
@@ -23,8 +23,9 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.runtime.operators.sort.QuickSort
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.table.api.Types
-import org.apache.flink.table.data.RowData
+import org.apache.flink.table.data.{GenericRowData, RowData}
import org.apache.flink.table.data.binary.BinaryRowData
+import org.apache.flink.table.data.utils.JoinedRowData
import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_ROW,
newName}
import
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
import org.apache.flink.table.planner.codegen._
@@ -612,6 +613,14 @@ class HashWindowCodeGenerator(
""".stripMargin
}
+ private def getOutputRowClass: Class[_ <: RowData] = {
+ if (namedProperties.isEmpty && grouping.isEmpty && isFinal) {
+ classOf[GenericRowData]
+ } else {
+ classOf[JoinedRowData]
+ }
+ }
+
private def genOutputDirectly(
windowSize: Long,
inputTerm: String,
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
index a712e71..f0b5b39 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
@@ -22,12 +22,12 @@ import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.data.binary.BinaryRowData
import org.apache.flink.table.data.utils.JoinedRowData
-import org.apache.flink.table.data.{GenericRowData, RowData}
+import org.apache.flink.table.data.GenericRowData
import org.apache.flink.table.expressions.ExpressionUtils.extractValue
import org.apache.flink.table.expressions.{Expression, ValueLiteralExpression}
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.planner.JLong
-import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty;
+import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_ROW,
TIMESTAMP_DATA, boxedTypeTermForType, newName}
import org.apache.flink.table.planner.codegen.GenerateUtils.generateFieldAccess
@@ -97,14 +97,6 @@ abstract class WindowCodeGenerator(
(groupKeyRowType.getChildren :+ timestampInternalType).toArray,
(groupKeyRowType.getFieldNames :+ "assignedTs$").toArray)
- def getOutputRowClass: Class[_ <: RowData] = {
- if (namedProperties.isEmpty && grouping.isEmpty) {
- classOf[GenericRowData]
- } else {
- classOf[JoinedRowData]
- }
- }
-
private[flink] def getWindowsGroupingElementInfo(
enablePreAccumulate: Boolean = true): RowType = {
if (enablePreAccumulate) {
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupWindowITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupWindowITCase.scala
index fa717d6..27016d0 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupWindowITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupWindowITCase.scala
@@ -22,14 +22,18 @@ import
org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_T
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.table.planner.runtime.utils.TestData._
import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
import org.apache.flink.table.planner.utils.{CountAggFunction,
IntAvgAggFunction, IntSumAggFunction}
+import org.apache.flink.types.Row
import org.junit.{Before, Test}
+import java.time.LocalDateTime
+
class GroupWindowITCase extends BatchTestBase {
@Before
@@ -641,4 +645,37 @@ class GroupWindowITCase extends BatchTestBase {
checkResult(sqlQuery, Seq())
}
+
+ @Test
+ def testLocalGlobalWindowAggregateWithoutGroupingAndNamedProperties(): Unit
= {
+ val data: Seq[Row] = Seq(
+ row(1, LocalDateTime.of(2021, 7, 26, 0, 0, 0)),
+ row(2, LocalDateTime.of(2021, 7, 26, 0, 0, 3)),
+ row(3, LocalDateTime.of(2021, 7, 26, 0, 0, 6)),
+ row(4, LocalDateTime.of(2021, 7, 26, 0, 0, 4)),
+ row(5, LocalDateTime.of(2021, 7, 26, 0, 0, 5)),
+ row(6, LocalDateTime.of(2021, 7, 26, 0, 0, 8)),
+ row(7, LocalDateTime.of(2021, 7, 26, 0, 0, 10)))
+ val dataId = TestValuesTableFactory.registerData(data)
+ val ddl =
+ s"""
+ |CREATE TABLE MyTable (
+ | a INT,
+ | ts TIMESTAMP
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '$dataId',
+ | 'bounded' = 'true'
+ |)
+ |""".stripMargin
+
+ tEnv.executeSql(ddl)
+ checkResult(
+ """
+ |SELECT sum(a) FROM MyTable
+ |GROUP BY
+ |TUMBLE(ts, interval '5' seconds)
+ |""".stripMargin,
+ Seq(row(14), row(7), row(7)))
+ }
}