asfgit closed pull request #7284: [FLINK-11136] [table] Fix the merge logic of 
DISTINCT aggregates
URL: https://github.com/apache/flink/pull/7284
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index 566e3d7cbc5..57cc815fee0 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -142,6 +142,21 @@ class AggregationCodeGenerator(
       fields.mkString(", ")
     }
 
+    val parametersCodeForDistinctMerge = aggFields.map { inFields =>
+      val fields = inFields.filter(_ > -1).zipWithIndex.map { case (f, i) =>
+        // index to constant
+        if (f >= physicalInputTypes.length) {
+          constantFields(f - physicalInputTypes.length)
+        }
+        // index to input field
+        else {
+          s"(${CodeGenUtils.boxedTypeTermForTypeInfo(physicalInputTypes(f))}) 
k.getField($i)"
+        }
+      }
+
+      fields.mkString(", ")
+    }
+
     // get method signatures
     val classes = UserDefinedFunctionUtils.typeInfoToClass(physicalInputTypes)
     val constantClasses = 
UserDefinedFunctionUtils.typeInfoToClass(constantTypes)
@@ -643,7 +658,7 @@ class AggregationCodeGenerator(
                |          (${classOf[Row].getCanonicalName}) entry.getKey();
                |      Long v = (Long) entry.getValue();
                |      if (aDistinctAcc$i.add(k, v)) {
-               |        ${aggs(i)}.accumulate(aAcc$i, k);
+               |        ${aggs(i)}.accumulate(aAcc$i, 
${parametersCodeForDistinctMerge(i)});
                |      }
                |    }
                |    a.setField($i, aDistinctAcc$i);
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 46dde8e0225..ddc2a687541 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -78,6 +78,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT c, " +
       "  COUNT(DISTINCT b)," +
+      "  SUM(DISTINCT b)," +
       "  SESSION_END(rowtime, INTERVAL '0.005' SECOND) " +
       "FROM MyTable " +
       "GROUP BY SESSION(rowtime, INTERVAL '0.005' SECOND), c "
@@ -87,9 +88,9 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.execute()
 
     val expected = Seq(
-      "Hello World,1,1970-01-01 00:00:00.014", // window starts at [9L] till 
{14L}
-      "Hello,1,1970-01-01 00:00:00.021",       // window starts at [16L] till 
{21L}, not merged
-      "Hello,3,1970-01-01 00:00:00.015"        // window starts at [1L,2L],
+      "Hello World,1,9,1970-01-01 00:00:00.014", // window starts at [9L] till 
{14L}
+      "Hello,1,16,1970-01-01 00:00:00.021",       // window starts at [16L] 
till {21L}, not merged
+      "Hello,3,6,1970-01-01 00:00:00.015"        // window starts at [1L,2L],
                                                //   merged with [8L,10L], by 
[4L], till {15L}
     )
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to