This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 68e0ee8  [FLINK-19739][table-runtime-blink] Fix compile exception for 
window aggregate in batch jobs
68e0ee8 is described below

commit 68e0ee84e908c7ed64a9f8ae2dbf6823e4106d71
Author: tsreaper <[email protected]>
AuthorDate: Wed Jul 28 19:25:21 2021 +0800

    [FLINK-19739][table-runtime-blink] Fix compile exception for window 
aggregate in batch jobs
    
    This closes #16617
---
 .../agg/batch/HashWindowCodeGenerator.scala        | 11 ++++++-
 .../codegen/agg/batch/WindowCodeGenerator.scala    | 12 ++-----
 .../runtime/batch/sql/agg/GroupWindowITCase.scala  | 37 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
index 864e71b..cee7229 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashWindowCodeGenerator.scala
@@ -23,8 +23,9 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.runtime.operators.sort.QuickSort
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.table.api.Types
-import org.apache.flink.table.data.RowData
+import org.apache.flink.table.data.{GenericRowData, RowData}
 import org.apache.flink.table.data.binary.BinaryRowData
+import org.apache.flink.table.data.utils.JoinedRowData
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_ROW, 
newName}
 import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect
 import org.apache.flink.table.planner.codegen._
@@ -612,6 +613,14 @@ class HashWindowCodeGenerator(
        """.stripMargin
   }
 
+  private def getOutputRowClass: Class[_ <: RowData] = {
+    if (namedProperties.isEmpty && grouping.isEmpty && isFinal) {
+      classOf[GenericRowData]
+    } else {
+      classOf[JoinedRowData]
+    }
+  }
+
   private def genOutputDirectly(
       windowSize: Long,
       inputTerm: String,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
index a712e71..f0b5b39 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
@@ -22,12 +22,12 @@ import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.data.binary.BinaryRowData
 import org.apache.flink.table.data.utils.JoinedRowData
-import org.apache.flink.table.data.{GenericRowData, RowData}
+import org.apache.flink.table.data.GenericRowData
 import org.apache.flink.table.expressions.ExpressionUtils.extractValue
 import org.apache.flink.table.expressions.{Expression, ValueLiteralExpression}
 import org.apache.flink.table.functions.AggregateFunction
 import org.apache.flink.table.planner.JLong
-import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty;
+import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{BINARY_ROW, 
TIMESTAMP_DATA, boxedTypeTermForType, newName}
 import org.apache.flink.table.planner.codegen.GenerateUtils.generateFieldAccess
@@ -97,14 +97,6 @@ abstract class WindowCodeGenerator(
     (groupKeyRowType.getChildren :+ timestampInternalType).toArray,
     (groupKeyRowType.getFieldNames :+ "assignedTs$").toArray)
 
-  def getOutputRowClass: Class[_ <: RowData] = {
-    if (namedProperties.isEmpty && grouping.isEmpty) {
-      classOf[GenericRowData]
-    } else {
-      classOf[JoinedRowData]
-    }
-  }
-
   private[flink] def getWindowsGroupingElementInfo(
       enablePreAccumulate: Boolean = true): RowType = {
     if (enablePreAccumulate) {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupWindowITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupWindowITCase.scala
index fa717d6..27016d0 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupWindowITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupWindowITCase.scala
@@ -22,14 +22,18 @@ import 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_T
 import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo.LOCAL_DATE_TIME
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
 import org.apache.flink.table.planner.utils.{CountAggFunction, 
IntAvgAggFunction, IntSumAggFunction}
+import org.apache.flink.types.Row
 
 import org.junit.{Before, Test}
 
+import java.time.LocalDateTime
+
 class GroupWindowITCase extends BatchTestBase {
 
   @Before
@@ -641,4 +645,37 @@ class GroupWindowITCase extends BatchTestBase {
 
     checkResult(sqlQuery, Seq())
   }
+
+  @Test
+  def testLocalGlobalWindowAggregateWithoutGroupingAndNamedProperties(): Unit 
= {
+    val data: Seq[Row] = Seq(
+      row(1, LocalDateTime.of(2021, 7, 26, 0, 0, 0)),
+      row(2, LocalDateTime.of(2021, 7, 26, 0, 0, 3)),
+      row(3, LocalDateTime.of(2021, 7, 26, 0, 0, 6)),
+      row(4, LocalDateTime.of(2021, 7, 26, 0, 0, 4)),
+      row(5, LocalDateTime.of(2021, 7, 26, 0, 0, 5)),
+      row(6, LocalDateTime.of(2021, 7, 26, 0, 0, 8)),
+      row(7, LocalDateTime.of(2021, 7, 26, 0, 0, 10)))
+    val dataId = TestValuesTableFactory.registerData(data)
+    val ddl =
+      s"""
+         |CREATE TABLE MyTable (
+         |  a INT,
+         |  ts TIMESTAMP
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId',
+         |  'bounded' = 'true'
+         |)
+         |""".stripMargin
+
+    tEnv.executeSql(ddl)
+    checkResult(
+      """
+        |SELECT sum(a) FROM MyTable
+        |GROUP BY
+        |TUMBLE(ts, interval '5' seconds)
+        |""".stripMargin,
+      Seq(row(14), row(7), row(7)))
+  }
 }

Reply via email to