This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2686d48fc3 [core] Fix data corruption for FieldSumAgg (#7992)
2686d48fc3 is described below
commit 2686d48fc3b97b6ba326a444fcfacf6e419c89f0
Author: Arnav Balyan <[email protected]>
AuthorDate: Sat Jun 6 14:12:46 2026 +0530
[core] Fix data corruption for FieldSumAgg (#7992)
---
.../mergetree/compact/aggregate/FieldSumAgg.java | 126 ++++++++++++++++++---
.../compact/aggregate/FieldAggregatorTest.java | 81 +++++++++++++
2 files changed, 194 insertions(+), 13 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
index 4b3ad12aea..e9d868e082 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
@@ -22,7 +22,7 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.DecimalUtils;
-/** sum aggregate a field of a row. */
+/** Sum aggregate a field of a row. */
public class FieldSumAgg extends FieldAggregator {
private static final long serialVersionUID = 1L;
@@ -55,16 +55,16 @@ public class FieldSumAgg extends FieldAggregator {
mergeFieldDD.scale());
break;
case TINYINT:
- sum = (byte) ((byte) accumulator + (byte) inputField);
+ sum = addExactByte((byte) accumulator, (byte) inputField);
break;
case SMALLINT:
- sum = (short) ((short) accumulator + (short) inputField);
+ sum = addExactShort((short) accumulator, (short) inputField);
break;
case INTEGER:
- sum = (int) accumulator + (int) inputField;
+ sum = addExactInt((int) accumulator, (int) inputField);
break;
case BIGINT:
- sum = (long) accumulator + (long) inputField;
+ sum = addExactLong((long) accumulator, (long) inputField);
break;
case FLOAT:
sum = (float) accumulator + (float) inputField;
@@ -105,16 +105,16 @@ public class FieldSumAgg extends FieldAggregator {
mergeFieldDD.scale());
break;
case TINYINT:
- sum = (byte) ((byte) accumulator - (byte) inputField);
+ sum = subtractExactByte((byte) accumulator, (byte) inputField);
break;
case SMALLINT:
- sum = (short) ((short) accumulator - (short) inputField);
+ sum = subtractExactShort((short) accumulator, (short)
inputField);
break;
case INTEGER:
- sum = (int) accumulator - (int) inputField;
+ sum = subtractExactInt((int) accumulator, (int) inputField);
break;
case BIGINT:
- sum = (long) accumulator - (long) inputField;
+ sum = subtractExactLong((long) accumulator, (long) inputField);
break;
case FLOAT:
sum = (float) accumulator - (float) inputField;
@@ -142,13 +142,13 @@ public class FieldSumAgg extends FieldAggregator {
return Decimal.fromBigDecimal(
decimal.toBigDecimal().negate(), decimal.precision(),
decimal.scale());
case TINYINT:
- return (byte) -((byte) value);
+ return negateExactByte((byte) value);
case SMALLINT:
- return (short) -((short) value);
+ return negateExactShort((short) value);
case INTEGER:
- return -((int) value);
+ return negateExactInt((int) value);
case BIGINT:
- return -((long) value);
+ return negateExactLong((long) value);
case FLOAT:
return -((float) value);
case DOUBLE:
@@ -161,4 +161,104 @@ public class FieldSumAgg extends FieldAggregator {
throw new IllegalArgumentException(msg);
}
}
+
+ private static byte addExactByte(byte a, byte b) {
+ int value = a + b;
+ if (value > Byte.MAX_VALUE || value < Byte.MIN_VALUE) {
+ throw new ArithmeticException(
+ String.format("byte overflow: %d + %d = %d", a, b, value));
+ }
+ return (byte) value;
+ }
+
+ private static short addExactShort(short a, short b) {
+ int value = a + b;
+ if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) {
+ throw new ArithmeticException(
+ String.format("short overflow: %d + %d = %d", a, b,
value));
+ }
+ return (short) value;
+ }
+
+ private static int addExactInt(int a, int b) {
+ try {
+ return Math.addExact(a, b);
+ } catch (ArithmeticException e) {
+ throw new ArithmeticException(String.format("int overflow: %d +
%d", a, b));
+ }
+ }
+
+ private static long addExactLong(long a, long b) {
+ try {
+ return Math.addExact(a, b);
+ } catch (ArithmeticException e) {
+ throw new ArithmeticException(String.format("long overflow: %d +
%d", a, b));
+ }
+ }
+
+ private static byte subtractExactByte(byte a, byte b) {
+ int value = a - b;
+ if (value > Byte.MAX_VALUE || value < Byte.MIN_VALUE) {
+ throw new ArithmeticException(
+ String.format("byte overflow: %d - %d = %d", a, b, value));
+ }
+ return (byte) value;
+ }
+
+ private static short subtractExactShort(short a, short b) {
+ int value = a - b;
+ if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) {
+ throw new ArithmeticException(
+ String.format("short overflow: %d - %d = %d", a, b,
value));
+ }
+ return (short) value;
+ }
+
+ private static int subtractExactInt(int a, int b) {
+ try {
+ return Math.subtractExact(a, b);
+ } catch (ArithmeticException e) {
+ throw new ArithmeticException(String.format("int overflow: %d -
%d", a, b));
+ }
+ }
+
+ private static long subtractExactLong(long a, long b) {
+ try {
+ return Math.subtractExact(a, b);
+ } catch (ArithmeticException e) {
+ throw new ArithmeticException(String.format("long overflow: %d -
%d", a, b));
+ }
+ }
+
+ private static byte negateExactByte(byte a) {
+ int value = -a;
+ if (value > Byte.MAX_VALUE || value < Byte.MIN_VALUE) {
+ throw new ArithmeticException(String.format("byte overflow: -%d =
%d", a, value));
+ }
+ return (byte) value;
+ }
+
+ private static short negateExactShort(short a) {
+ int value = -a;
+ if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) {
+ throw new ArithmeticException(String.format("short overflow: -%d =
%d", a, value));
+ }
+ return (short) value;
+ }
+
+ private static int negateExactInt(int a) {
+ try {
+ return Math.negateExact(a);
+ } catch (ArithmeticException e) {
+ throw new ArithmeticException(String.format("int overflow: -%d",
a));
+ }
+ }
+
+ private static long negateExactLong(long a) {
+ try {
+ return Math.negateExact(a);
+ } catch (ArithmeticException e) {
+ throw new ArithmeticException(String.format("long overflow: -%d",
a));
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index a6a2794c36..5496503b77 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -575,6 +575,87 @@ public class FieldAggregatorTest {
.isInstanceOf(ArithmeticException.class);
}
+ @Test
+ public void testFieldSumByteOverflow() {
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
TinyIntType(), null, null);
+ assertThatThrownBy(() -> fieldSumAgg.agg(Byte.MAX_VALUE, (byte) 1))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.agg(Byte.MIN_VALUE, (byte) -1))
+ .isInstanceOf(ArithmeticException.class);
+ }
+
+ @Test
+ public void testFieldSumShortOverflow() {
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
SmallIntType(), null, null);
+ assertThatThrownBy(() -> fieldSumAgg.agg(Short.MAX_VALUE, (short) 1))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.agg(Short.MIN_VALUE, (short) -1))
+ .isInstanceOf(ArithmeticException.class);
+ }
+
+ @Test
+ public void testFieldSumIntOverflow() {
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
IntType(), null, null);
+ assertThatThrownBy(() -> fieldSumAgg.agg(Integer.MAX_VALUE, 1))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.agg(Integer.MIN_VALUE, -1))
+ .isInstanceOf(ArithmeticException.class);
+ }
+
+ @Test
+ public void testFieldSumLongOverflow() {
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
BigIntType(), null, null);
+ assertThatThrownBy(() -> fieldSumAgg.agg(Long.MAX_VALUE, 1L))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.agg(Long.MIN_VALUE, -1L))
+ .isInstanceOf(ArithmeticException.class);
+ }
+
+ @Test
+ public void testFieldSumByteRetractOverflow() {
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
TinyIntType(), null, null);
+ assertThatThrownBy(() -> fieldSumAgg.retract(Byte.MIN_VALUE, (byte) 1))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.retract(Byte.MAX_VALUE, (byte)
-1))
+ .isInstanceOf(ArithmeticException.class);
+ // retract(null, MIN_VALUE) negates inputField, which also overflows.
+ assertThatThrownBy(() -> fieldSumAgg.retract(null, Byte.MIN_VALUE))
+ .isInstanceOf(ArithmeticException.class);
+ }
+
+ @Test
+ public void testFieldSumShortRetractOverflow() {
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
SmallIntType(), null, null);
+ assertThatThrownBy(() -> fieldSumAgg.retract(Short.MIN_VALUE, (short)
1))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.retract(Short.MAX_VALUE, (short)
-1))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.retract(null, Short.MIN_VALUE))
+ .isInstanceOf(ArithmeticException.class);
+ }
+
+ @Test
+ public void testFieldSumIntRetractOverflow() {
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
IntType(), null, null);
+ assertThatThrownBy(() -> fieldSumAgg.retract(Integer.MIN_VALUE, 1))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.retract(Integer.MAX_VALUE, -1))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.retract(null, Integer.MIN_VALUE))
+ .isInstanceOf(ArithmeticException.class);
+ }
+
+ @Test
+ public void testFieldSumLongRetractOverflow() {
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
BigIntType(), null, null);
+ assertThatThrownBy(() -> fieldSumAgg.retract(Long.MIN_VALUE, 1L))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.retract(Long.MAX_VALUE, -1L))
+ .isInstanceOf(ArithmeticException.class);
+ assertThatThrownBy(() -> fieldSumAgg.retract(null, Long.MIN_VALUE))
+ .isInstanceOf(ArithmeticException.class);
+ }
+
@Test
public void testFieldProductFloatAgg() {
FieldProductAgg fieldProductAgg =