twalthr commented on a change in pull request #18135:
URL: https://github.com/apache/flink/pull/18135#discussion_r772260255
##########
File path:
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
##########
@@ -425,6 +426,80 @@ class AggregateITCase(
assertEquals(expected, sink.getRetractResults)
}
+ @Test
+ def testPrecisionForSumWithRetractAggregationOnDecimal(): Unit = {
+ val upsertSourceCurrencyData = List(
+ changelogRow("+I", 1.03520274.bigDecimal, 12345.035202748654.bigDecimal,
+ 12.345678901234567.bigDecimal, "a"),
+ changelogRow("+I", 1.03520274.bigDecimal, 12345.035202748654.bigDecimal,
+ 12.345678901234567.bigDecimal, "b"),
+ changelogRow("-D", 1.03520274.bigDecimal, 12345.035202748654.bigDecimal,
+ 12.345678901234567.bigDecimal, "b"),
+ changelogRow("+I", 2.13520275.bigDecimal, 21245.542202748654.bigDecimal,
+ 242.78594201234567.bigDecimal, "a"),
+ changelogRow("+I", 1.11111111.bigDecimal, 11111.111111111111.bigDecimal,
+ 111.11111111111111.bigDecimal, "b"),
+ changelogRow("+I", 1.11111111.bigDecimal, 11111.111111111111.bigDecimal,
+ 111.11111111111111.bigDecimal, "a"),
+ changelogRow("-D", 1.11111111.bigDecimal, 11111.111111111111.bigDecimal,
+ 111.11111111111111.bigDecimal, "b"),
+ changelogRow("+I", 2.13520275.bigDecimal, 21245.542202748654.bigDecimal,
+ 242.78594201234567.bigDecimal, "a"))
+
+ val upsertSourceDataId = registerData(upsertSourceCurrencyData);
+ tEnv.executeSql(
+ s"""
+ |CREATE TABLE T (
+ | `a` DECIMAL(32, 8),
+ | `b` DECIMAL(32, 20),
+ | `c` DECIMAL(32, 20),
+ | `d` STRING
+ |) WITH (
+ | 'connector' = 'values',
+ | 'data-id' = '${upsertSourceDataId}',
+ | 'changelog-mode' = 'I,D',
+ | 'failing-source' = 'true'
+ |)
+ |""".stripMargin)
+
+ val sql = "SELECT sum(a), sum(b), sum(c) FROM T GROUP BY d"
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
+ env.execute()
+
+ val expected =
List("6.41671935,65947.23071935707000000000,609.02867403703699700000")
Review comment:
can you add a comment to explain the result? for me as a reviewer that
is not super familiar with the topic anymore, it is difficult to see something
in those numbers
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
##########
@@ -762,10 +762,12 @@
.build();
/**
- * Special "+" operator used internally by {@code SumAggFunction} to
implement SUM aggregation
- * on a Decimal type. Uses the {@link
LogicalTypeMerging#findSumAggType(LogicalType)} to avoid
- * the normal {@link #PLUS} override the special calculation for precision
and scale needed by
- * SUM.
+ * Special "+" operator used internally by {@code SumAggFunction}, {@code
Sum0AggFunction},
Review comment:
remove `IncrSumAggFunction`. how about we keep this section a bit more
generic to not update it whenever we update the implementation? maybe just
`used internally by for implementing SUM/AVG aggregations (with and without
retractions)`
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/AvgAggFunction.java
##########
@@ -106,6 +104,16 @@ public Expression getValueExpression() {
return ifThenElse(equalTo(count, literal(0L)), ifTrue, ifFalse);
}
+ protected UnresolvedCallExpression doPlus(
Review comment:
`doPlus` sounds a bit generic, how about `specializedPlus` or
`adjustedPlus`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]