This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2686d48fc3 [core] Fix data corruption for FieldSumAgg (#7992)
2686d48fc3 is described below

commit 2686d48fc3b97b6ba326a444fcfacf6e419c89f0
Author: Arnav Balyan <[email protected]>
AuthorDate: Sat Jun 6 14:12:46 2026 +0530

    [core] Fix data corruption for FieldSumAgg (#7992)
---
 .../mergetree/compact/aggregate/FieldSumAgg.java   | 126 ++++++++++++++++++---
 .../compact/aggregate/FieldAggregatorTest.java     |  81 +++++++++++++
 2 files changed, 194 insertions(+), 13 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
index 4b3ad12aea..e9d868e082 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
@@ -22,7 +22,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.DecimalUtils;
 
-/** sum aggregate a field of a row. */
+/** Sum aggregate a field of a row. */
 public class FieldSumAgg extends FieldAggregator {
 
     private static final long serialVersionUID = 1L;
@@ -55,16 +55,16 @@ public class FieldSumAgg extends FieldAggregator {
                                 mergeFieldDD.scale());
                 break;
             case TINYINT:
-                sum = (byte) ((byte) accumulator + (byte) inputField);
+                sum = addExactByte((byte) accumulator, (byte) inputField);
                 break;
             case SMALLINT:
-                sum = (short) ((short) accumulator + (short) inputField);
+                sum = addExactShort((short) accumulator, (short) inputField);
                 break;
             case INTEGER:
-                sum = (int) accumulator + (int) inputField;
+                sum = addExactInt((int) accumulator, (int) inputField);
                 break;
             case BIGINT:
-                sum = (long) accumulator + (long) inputField;
+                sum = addExactLong((long) accumulator, (long) inputField);
                 break;
             case FLOAT:
                 sum = (float) accumulator + (float) inputField;
@@ -105,16 +105,16 @@ public class FieldSumAgg extends FieldAggregator {
                                 mergeFieldDD.scale());
                 break;
             case TINYINT:
-                sum = (byte) ((byte) accumulator - (byte) inputField);
+                sum = subtractExactByte((byte) accumulator, (byte) inputField);
                 break;
             case SMALLINT:
-                sum = (short) ((short) accumulator - (short) inputField);
+                sum = subtractExactShort((short) accumulator, (short) 
inputField);
                 break;
             case INTEGER:
-                sum = (int) accumulator - (int) inputField;
+                sum = subtractExactInt((int) accumulator, (int) inputField);
                 break;
             case BIGINT:
-                sum = (long) accumulator - (long) inputField;
+                sum = subtractExactLong((long) accumulator, (long) inputField);
                 break;
             case FLOAT:
                 sum = (float) accumulator - (float) inputField;
@@ -142,13 +142,13 @@ public class FieldSumAgg extends FieldAggregator {
                 return Decimal.fromBigDecimal(
                         decimal.toBigDecimal().negate(), decimal.precision(), 
decimal.scale());
             case TINYINT:
-                return (byte) -((byte) value);
+                return negateExactByte((byte) value);
             case SMALLINT:
-                return (short) -((short) value);
+                return negateExactShort((short) value);
             case INTEGER:
-                return -((int) value);
+                return negateExactInt((int) value);
             case BIGINT:
-                return -((long) value);
+                return negateExactLong((long) value);
             case FLOAT:
                 return -((float) value);
             case DOUBLE:
@@ -161,4 +161,104 @@ public class FieldSumAgg extends FieldAggregator {
                 throw new IllegalArgumentException(msg);
         }
     }
+
+    private static byte addExactByte(byte a, byte b) {
+        int value = a + b;
+        if (value > Byte.MAX_VALUE || value < Byte.MIN_VALUE) {
+            throw new ArithmeticException(
+                    String.format("byte overflow: %d + %d = %d", a, b, value));
+        }
+        return (byte) value;
+    }
+
+    private static short addExactShort(short a, short b) {
+        int value = a + b;
+        if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) {
+            throw new ArithmeticException(
+                    String.format("short overflow: %d + %d = %d", a, b, 
value));
+        }
+        return (short) value;
+    }
+
+    private static int addExactInt(int a, int b) {
+        try {
+            return Math.addExact(a, b);
+        } catch (ArithmeticException e) {
+            throw new ArithmeticException(String.format("int overflow: %d + 
%d", a, b));
+        }
+    }
+
+    private static long addExactLong(long a, long b) {
+        try {
+            return Math.addExact(a, b);
+        } catch (ArithmeticException e) {
+            throw new ArithmeticException(String.format("long overflow: %d + 
%d", a, b));
+        }
+    }
+
+    private static byte subtractExactByte(byte a, byte b) {
+        int value = a - b;
+        if (value > Byte.MAX_VALUE || value < Byte.MIN_VALUE) {
+            throw new ArithmeticException(
+                    String.format("byte overflow: %d - %d = %d", a, b, value));
+        }
+        return (byte) value;
+    }
+
+    private static short subtractExactShort(short a, short b) {
+        int value = a - b;
+        if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) {
+            throw new ArithmeticException(
+                    String.format("short overflow: %d - %d = %d", a, b, 
value));
+        }
+        return (short) value;
+    }
+
+    private static int subtractExactInt(int a, int b) {
+        try {
+            return Math.subtractExact(a, b);
+        } catch (ArithmeticException e) {
+            throw new ArithmeticException(String.format("int overflow: %d - 
%d", a, b));
+        }
+    }
+
+    private static long subtractExactLong(long a, long b) {
+        try {
+            return Math.subtractExact(a, b);
+        } catch (ArithmeticException e) {
+            throw new ArithmeticException(String.format("long overflow: %d - 
%d", a, b));
+        }
+    }
+
+    private static byte negateExactByte(byte a) {
+        int value = -a;
+        if (value > Byte.MAX_VALUE || value < Byte.MIN_VALUE) {
+            throw new ArithmeticException(String.format("byte overflow: -%d = 
%d", a, value));
+        }
+        return (byte) value;
+    }
+
+    private static short negateExactShort(short a) {
+        int value = -a;
+        if (value > Short.MAX_VALUE || value < Short.MIN_VALUE) {
+            throw new ArithmeticException(String.format("short overflow: -%d = 
%d", a, value));
+        }
+        return (short) value;
+    }
+
+    private static int negateExactInt(int a) {
+        try {
+            return Math.negateExact(a);
+        } catch (ArithmeticException e) {
+            throw new ArithmeticException(String.format("int overflow: -%d", 
a));
+        }
+    }
+
+    private static long negateExactLong(long a) {
+        try {
+            return Math.negateExact(a);
+        } catch (ArithmeticException e) {
+            throw new ArithmeticException(String.format("long overflow: -%d", 
a));
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index a6a2794c36..5496503b77 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -575,6 +575,87 @@ public class FieldAggregatorTest {
                 .isInstanceOf(ArithmeticException.class);
     }
 
+    @Test
+    public void testFieldSumByteOverflow() {
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
TinyIntType(), null, null);
+        assertThatThrownBy(() -> fieldSumAgg.agg(Byte.MAX_VALUE, (byte) 1))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.agg(Byte.MIN_VALUE, (byte) -1))
+                .isInstanceOf(ArithmeticException.class);
+    }
+
+    @Test
+    public void testFieldSumShortOverflow() {
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
SmallIntType(), null, null);
+        assertThatThrownBy(() -> fieldSumAgg.agg(Short.MAX_VALUE, (short) 1))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.agg(Short.MIN_VALUE, (short) -1))
+                .isInstanceOf(ArithmeticException.class);
+    }
+
+    @Test
+    public void testFieldSumIntOverflow() {
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
IntType(), null, null);
+        assertThatThrownBy(() -> fieldSumAgg.agg(Integer.MAX_VALUE, 1))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.agg(Integer.MIN_VALUE, -1))
+                .isInstanceOf(ArithmeticException.class);
+    }
+
+    @Test
+    public void testFieldSumLongOverflow() {
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
BigIntType(), null, null);
+        assertThatThrownBy(() -> fieldSumAgg.agg(Long.MAX_VALUE, 1L))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.agg(Long.MIN_VALUE, -1L))
+                .isInstanceOf(ArithmeticException.class);
+    }
+
+    @Test
+    public void testFieldSumByteRetractOverflow() {
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
TinyIntType(), null, null);
+        assertThatThrownBy(() -> fieldSumAgg.retract(Byte.MIN_VALUE, (byte) 1))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.retract(Byte.MAX_VALUE, (byte) 
-1))
+                .isInstanceOf(ArithmeticException.class);
+        // retract(null, MIN_VALUE) negates inputField, which also overflows.
+        assertThatThrownBy(() -> fieldSumAgg.retract(null, Byte.MIN_VALUE))
+                .isInstanceOf(ArithmeticException.class);
+    }
+
+    @Test
+    public void testFieldSumShortRetractOverflow() {
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
SmallIntType(), null, null);
+        assertThatThrownBy(() -> fieldSumAgg.retract(Short.MIN_VALUE, (short) 
1))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.retract(Short.MAX_VALUE, (short) 
-1))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.retract(null, Short.MIN_VALUE))
+                .isInstanceOf(ArithmeticException.class);
+    }
+
+    @Test
+    public void testFieldSumIntRetractOverflow() {
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
IntType(), null, null);
+        assertThatThrownBy(() -> fieldSumAgg.retract(Integer.MIN_VALUE, 1))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.retract(Integer.MAX_VALUE, -1))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.retract(null, Integer.MIN_VALUE))
+                .isInstanceOf(ArithmeticException.class);
+    }
+
+    @Test
+    public void testFieldSumLongRetractOverflow() {
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
BigIntType(), null, null);
+        assertThatThrownBy(() -> fieldSumAgg.retract(Long.MIN_VALUE, 1L))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.retract(Long.MAX_VALUE, -1L))
+                .isInstanceOf(ArithmeticException.class);
+        assertThatThrownBy(() -> fieldSumAgg.retract(null, Long.MIN_VALUE))
+                .isInstanceOf(ArithmeticException.class);
+    }
+
     @Test
     public void testFieldProductFloatAgg() {
         FieldProductAgg fieldProductAgg =

Reply via email to