twalthr commented on a change in pull request #18135:
URL: https://github.com/apache/flink/pull/18135#discussion_r772260255



##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
##########
@@ -425,6 +426,80 @@ class AggregateITCase(
     assertEquals(expected, sink.getRetractResults)
   }
 
+  @Test
+  def testPrecisionForSumWithRetractAggregationOnDecimal(): Unit = {
+    val upsertSourceCurrencyData = List(
+      changelogRow("+I", 1.03520274.bigDecimal, 12345.035202748654.bigDecimal,
+        12.345678901234567.bigDecimal, "a"),
+      changelogRow("+I", 1.03520274.bigDecimal, 12345.035202748654.bigDecimal,
+        12.345678901234567.bigDecimal, "b"),
+      changelogRow("-D", 1.03520274.bigDecimal, 12345.035202748654.bigDecimal,
+        12.345678901234567.bigDecimal, "b"),
+      changelogRow("+I", 2.13520275.bigDecimal, 21245.542202748654.bigDecimal,
+        242.78594201234567.bigDecimal, "a"),
+      changelogRow("+I", 1.11111111.bigDecimal, 11111.111111111111.bigDecimal,
+        111.11111111111111.bigDecimal, "b"),
+      changelogRow("+I", 1.11111111.bigDecimal, 11111.111111111111.bigDecimal,
+        111.11111111111111.bigDecimal, "a"),
+      changelogRow("-D", 1.11111111.bigDecimal, 11111.111111111111.bigDecimal,
+        111.11111111111111.bigDecimal, "b"),
+      changelogRow("+I", 2.13520275.bigDecimal, 21245.542202748654.bigDecimal,
+        242.78594201234567.bigDecimal, "a"))
+
+    val upsertSourceDataId = registerData(upsertSourceCurrencyData);
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE T (
+         | `a` DECIMAL(32, 8),
+         | `b` DECIMAL(32, 20),
+         | `c` DECIMAL(32, 20),
+         | `d` STRING
+         |) WITH (
+         | 'connector' = 'values',
+         | 'data-id' = '${upsertSourceDataId}',
+         | 'changelog-mode' = 'I,D',
+         | 'failing-source' = 'true'
+         |)
+         |""".stripMargin)
+
+    val sql = "SELECT sum(a), sum(b), sum(c) FROM T GROUP BY d"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = 
List("6.41671935,65947.23071935707000000000,609.02867403703699700000")

Review comment:
       can you add a comment to explain the result? for me as a reviewer that 
is not super familiar with the topic anymore, it is difficult to see something 
in those numbers

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
##########
@@ -762,10 +762,12 @@
                     .build();
 
     /**
-     * Special "+" operator used internally by {@code SumAggFunction} to 
implement SUM aggregation
-     * on a Decimal type. Uses the {@link 
LogicalTypeMerging#findSumAggType(LogicalType)} to avoid
-     * the normal {@link #PLUS} override the special calculation for precision 
and scale needed by
-     * SUM.
+     * Special "+" operator used internally by {@code SumAggFunction}, {@code 
Sum0AggFunction},

Review comment:
       remove `IncrSumAggFunction`. how about we keep this section a bit more 
generic to not update it whenever we update the implementation? maybe just 
`used internally by for implementing SUM/AVG aggregations (with and without 
retractions)`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/AvgAggFunction.java
##########
@@ -106,6 +104,16 @@ public Expression getValueExpression() {
         return ifThenElse(equalTo(count, literal(0L)), ifTrue, ifFalse);
     }
 
+    protected UnresolvedCallExpression doPlus(

Review comment:
       `doPlus` sounds a bit generic, how about `specializedPlus` or 
`adjustedPlus`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to