This is an automated email from the ASF dual-hosted git repository.

chenli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new ab3317b8ef fix(AttributeTypeUtils): use Double.NEGATIVE_INFINITY 
instead of Double.MIN_VALUE (#4145)
ab3317b8ef is described below

commit ab3317b8ef12e1b56205bf0f35c5ae1edde42d9d
Author: carloea2 <[email protected]>
AuthorDate: Sat Jan 10 14:01:24 2026 -0800

    fix(AttributeTypeUtils): use Double.NEGATIVE_INFINITY instead of 
Double.MIN_VALUE (#4145)
    
    ### What changes were proposed in this PR?
    
    This PR fixes the value returned by `minValue` for
    `AttributeType.DOUBLE` in `AggregationOperation.scala`.
    
    Previously, the code used `Double.MIN_VALUE`, which is the smallest
    positive non-zero double, not the most-negative value.
    
    
    
https://github.com/apache/texera/blob/07c35d004a1185e4098b56591166b62cc9ab4856/common/workflow-operator/src/main/scala/org/apache/amber/operator/aggregate/AggregationOperation.scala#L335-L345
    
    The fix replaces `Double.MIN_VALUE` with `Double.NEGATIVE_INFINITY` and
    add new testing.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4144
    Related discussion: #4049 (Clarify minValue intent in
    AggregationOperation)
    
    ### How was this PR tested?
    
    - AggregateOpSpec.scala
    - AttributeTypeUtilsSpec.scala
    - Frontend Manual Test
    
    ### Was this PR authored or co-authored using generative AI tooling?
    No
    
    ---------
    
    Co-authored-by: Xiaozhen Liu <[email protected]>
---
 .../amber/core/tuple/AttributeTypeUtils.scala      |   4 +-
 .../amber/core/tuple/AttributeTypeUtilsSpec.scala  |   2 +-
 .../amber/operator/aggregate/AggregateOpSpec.scala | 170 +++++++++++++++++++++
 3 files changed, 173 insertions(+), 3 deletions(-)

diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
index 208f94d040..17c3dfef33 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtils.scala
@@ -509,7 +509,7 @@ object AttributeTypeUtils extends Serializable {
         )
     }
 
-  /** Returns the minimum possible value for a given attribute type. (note 
Double.MIN_VALUE is > 0).
+  /** Returns the minimum possible value for a given attribute type.
     * For BINARY under lexicographic order, the empty array is the global 
minimum.
     */
   @throws[UnsupportedOperationException]
@@ -517,7 +517,7 @@ object AttributeTypeUtils extends Serializable {
     attrType match {
       case AttributeType.INTEGER   => 
java.lang.Integer.valueOf(Integer.MIN_VALUE)
       case AttributeType.LONG      => 
java.lang.Long.valueOf(java.lang.Long.MIN_VALUE)
-      case AttributeType.DOUBLE    => 
java.lang.Double.valueOf(java.lang.Double.MIN_VALUE)
+      case AttributeType.DOUBLE    => 
java.lang.Double.valueOf(java.lang.Double.NEGATIVE_INFINITY)
       case AttributeType.TIMESTAMP => new Timestamp(0L)
       case AttributeType.BINARY    => Array.emptyByteArray
       case _ =>
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
index defa567480..446ce518fa 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/tuple/AttributeTypeUtilsSpec.scala
@@ -364,7 +364,7 @@ class AttributeTypeUtilsSpec extends AnyFunSuite {
 
     assert(integerMin == Int.MinValue)
     assert(longMin == Long.MinValue)
-    assert(doubleMin == java.lang.Double.MIN_VALUE)
+    assert(doubleMin == java.lang.Double.NEGATIVE_INFINITY)
   }
 
   test("minValue returns timestamp epoch and empty binary array, and fails for 
unsupported types") {
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpSpec.scala
index e43f3398cc..170efee773 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/aggregate/AggregateOpSpec.scala
@@ -203,6 +203,176 @@ class AggregateOpSpec extends AnyFunSuite {
     assert(result == 250L)
   }
 
+  test(
+    "MIN aggregation (DOUBLE) is solid: empty/null/NaN, infinities, signed 
zero, and many values"
+  ) {
+    val schema = makeSchema("temperature" -> AttributeType.DOUBLE)
+
+    val operation = makeAggregationOp(AggregationFunction.MIN, "temperature", 
"min_temp")
+    val agg = operation.getAggFunc(AttributeType.DOUBLE)
+
+    // -----------------------
+    // 0) Empty input => null
+    // -----------------------
+    val emptyPartial = agg.init()
+    val emptyResult = agg.finalAgg(emptyPartial)
+    assert(emptyResult == null)
+
+    // ---------------------------------------------------------
+    // 1) Only nulls / only NaNs => null
+    // ---------------------------------------------------------
+    val onlyNulls = Seq(makeTuple(schema, null), makeTuple(schema, null), 
makeTuple(schema, null))
+    var partialNulls = agg.init()
+    onlyNulls.foreach(tp => partialNulls = agg.iterate(partialNulls, tp))
+    assert(agg.finalAgg(partialNulls) == null)
+
+    val onlyNaNs = Seq(makeTuple(schema, Double.NaN), makeTuple(schema, 
Double.NaN))
+    var partialNans = agg.init()
+    onlyNaNs.foreach(tp => partialNans = agg.iterate(partialNans, tp))
+    assert(agg.finalAgg(partialNans) == null)
+
+    // ---------------------------------------------------------
+    // 2) Basic decimals + negatives: should find the true minimum
+    // ---------------------------------------------------------
+    val basics = Seq(
+      makeTuple(schema, 10.25),
+      makeTuple(schema, -2.5),
+      makeTuple(schema, 5.0),
+      makeTuple(schema, -2.5000000001), // slightly smaller than -2.5
+      makeTuple(schema, 1.0e-12)
+    )
+
+    var partial = agg.init()
+    basics.foreach(tp => partial = agg.iterate(partial, tp))
+
+    val basicResult = agg.finalAgg(partial).asInstanceOf[Number].doubleValue()
+    assert(basicResult == -2.5000000001)
+
+    // ---------------------------------------------------------
+    // 3) NaN + null interleaving must not poison the result
+    //    (especially if NaN appears first)
+    // ---------------------------------------------------------
+    val mixed = Seq(
+      makeTuple(schema, Double.NaN),
+      makeTuple(schema, null),
+      makeTuple(schema, 3.14159),
+      makeTuple(schema, -0.125),
+      makeTuple(schema, Double.NaN),
+      makeTuple(schema, -9999.0),
+      makeTuple(schema, null)
+    )
+
+    var partialMixed = agg.init()
+    mixed.foreach(tp => partialMixed = agg.iterate(partialMixed, tp))
+
+    val mixedResult = 
agg.finalAgg(partialMixed).asInstanceOf[Number].doubleValue()
+    assert(mixedResult == -9999.0)
+
+    // ---------------------------------------------------------
+    // 4) Infinities: min should be -Infinity if present
+    // ---------------------------------------------------------
+    val infinities = Seq(
+      makeTuple(schema, Double.PositiveInfinity),
+      makeTuple(schema, 42.0),
+      makeTuple(schema, Double.NegativeInfinity),
+      makeTuple(schema, -1.0)
+    )
+
+    var partialInf = agg.init()
+    infinities.foreach(tp => partialInf = agg.iterate(partialInf, tp))
+
+    val infResult = agg.finalAgg(partialInf).asInstanceOf[Number].doubleValue()
+    assert(infResult.isNegInfinity)
+
+    // ---------------------------------------------------------
+    // 5) Signed zero: MIN(-0.0, +0.0) should be -0.0
+    // ---------------------------------------------------------
+    val signedZero = Seq(makeTuple(schema, 0.0), makeTuple(schema, -0.0), 
makeTuple(schema, 0.0))
+    var partialZero = agg.init()
+    signedZero.foreach(tp => partialZero = agg.iterate(partialZero, tp))
+
+    val zeroValue = 
agg.finalAgg(partialZero).asInstanceOf[Number].doubleValue()
+    val zeroBits = java.lang.Double.doubleToRawLongBits(zeroValue)
+    val negZeroBits = java.lang.Double.doubleToRawLongBits(-0.0)
+    assert(zeroBits == negZeroBits)
+
+    // ---------------------------------------------------------
+    // 6) Stress test: many values, compare against a reference min
+    //    Reference rule here: ignore null and NaN; return null if none left.
+    // ---------------------------------------------------------
+    val rng = new scala.util.Random(1337)
+    val values: Seq[java.lang.Double] =
+      (1 to 10000).map { index =>
+        index % 250 match {
+          case 0 => null
+          case 1 => Double.NaN
+          case 2 => Double.PositiveInfinity
+          case 3 => Double.NegativeInfinity
+          case 4 => -0.0
+          case _ =>
+            // wide-ish range with some tiny magnitudes too
+            val sign = if (rng.nextBoolean()) 1.0 else -1.0
+            sign * (rng.nextDouble() * 1.0e6) / (if (rng.nextInt(20) == 0) 
1.0e12 else 1.0)
+        }
+      }
+
+    val expected: java.lang.Double = {
+      var found = false
+      var currentMin = 0.0
+      values.foreach { x =>
+        if (x != null && !java.lang.Double.isNaN(x)) {
+          if (!found) {
+            currentMin = x; found = true
+          } else if (java.lang.Double.compare(x, currentMin) < 0) currentMin = 
x
+        }
+      }
+      if (!found) null else currentMin
+    }
+
+    var partStress = agg.init()
+    values.foreach(v => partStress = agg.iterate(partStress, makeTuple(schema, 
v)))
+    val gotAny = agg.finalAgg(partStress)
+
+    if (expected == null) {
+      assert(gotAny == null)
+    } else {
+      val got = gotAny.asInstanceOf[Number].doubleValue()
+      // exact match: should be one of the seen inputs, no tolerance needed
+      if (
+        expected == 0.0 && java.lang.Double.doubleToRawLongBits(expected) != 
java.lang.Double
+          .doubleToRawLongBits(got)
+      ) {
+        // If expected is -0.0, enforce it
+        assert(
+          java.lang.Double.doubleToRawLongBits(got) == 
java.lang.Double.doubleToRawLongBits(
+            expected
+          )
+        )
+      } else {
+        assert(got == expected)
+      }
+    }
+  }
+
+  test("MAX aggregation finds largest DOUBLE value") {
+    val maxValue = 99.144
+    val schema = makeSchema("debt" -> AttributeType.DOUBLE)
+    val tuple1 = makeTuple(schema, -100.123)
+    val tuple2 = makeTuple(schema, 50.12)
+    val tuple3 = makeTuple(schema, maxValue)
+
+    val operation = makeAggregationOp(AggregationFunction.MAX, "debt", 
"nax_debt")
+    val agg = operation.getAggFunc(AttributeType.DOUBLE)
+
+    var partial = agg.init()
+    partial = agg.iterate(partial, tuple1)
+    partial = agg.iterate(partial, tuple2)
+    partial = agg.iterate(partial, tuple3)
+
+    val result = 
agg.finalAgg(partial).asInstanceOf[java.lang.Double].doubleValue()
+    assert(result == maxValue)
+  }
+
   test("AVERAGE aggregation ignores nulls and returns null when all values are 
null") {
     val schema = makeSchema("price" -> AttributeType.DOUBLE)
     val tuple1 = makeTuple(schema, 10.0)

Reply via email to