gemini-code-assist[bot] commented on code in PR #38871:
URL: https://github.com/apache/beam/pull/38871#discussion_r3405082912


##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFn.java:
##########
@@ -133,7 +147,19 @@ public Coder<VarianceAccumulator> getAccumulatorCoder(
 
   @Override
   public T extractOutput(VarianceAccumulator accumulator) {
-    return decimalConverter.apply(getVariance(accumulator));
+    BigDecimal result = getVariance(accumulator);
+    if (result != null && isStddev) {
+      double doubleVal = result.doubleValue();
+      if (doubleVal < 0.0) {
+        doubleVal = 0.0; // Clamp negative variance due to numerical 
instability
+      }
+      double sqrtVal = Math.sqrt(doubleVal);
+      if (Double.isInfinite(sqrtVal)) {
+        throw new ArithmeticException("Standard deviation overflow: result is 
infinity");
+      }
+      result = BigDecimal.valueOf(sqrtVal);
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Converting the `BigDecimal` variance to a `double` before taking the square 
root can cause artificial overflow or underflow. For example, if the variance 
is `1e400` (which is a valid `BigDecimal`), `result.doubleValue()` overflows to 
`Double.POSITIVE_INFINITY`, causing an `ArithmeticException` even though the 
actual standard deviation (`1e200`) is perfectly representable as a finite 
`double` and `BigDecimal`.
   
   To prevent this, we can scale the `BigDecimal` by an even power of 10 to 
bring it into the representable range of `double` before computing the square 
root, and then scale the result back.
   
   ```java
       BigDecimal result = getVariance(accumulator);
       if (result != null && isStddev) {
         double doubleVal = result.doubleValue();
         double sqrtVal;
         if (Double.isInfinite(doubleVal) || (doubleVal == 0.0 && 
result.signum() != 0)) {
           int scaleAdjustment = result.precision() - result.scale();
           if (scaleAdjustment % 2 != 0) {
             scaleAdjustment += (scaleAdjustment > 0) ? 1 : -1;
           }
           BigDecimal scaledResult = result.scaleByPowerOfTen(-scaleAdjustment);
           doubleVal = scaledResult.doubleValue();
           if (doubleVal < 0.0) {
             doubleVal = 0.0;
           }
           sqrtVal = Math.sqrt(doubleVal);
           result = 
BigDecimal.valueOf(sqrtVal).scaleByPowerOfTen(scaleAdjustment / 2);
         } else {
           if (doubleVal < 0.0) {
             doubleVal = 0.0;
           }
           sqrtVal = Math.sqrt(doubleVal);
           if (Double.isInfinite(sqrtVal)) {
             throw new ArithmeticException("Standard deviation overflow: result 
is infinity");
           }
           result = BigDecimal.valueOf(sqrtVal);
         }
       }
   ```



##########
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java:
##########
@@ -51,6 +52,16 @@ public static Iterable<Object[]> varianceFns() {
             VarianceFn.newSample(BigDecimal::intValue),
             newVarianceAccumulator(FIFTEEN, FOUR, ZERO),
             5
+          },
+          {
+            VarianceFn.newPopulationStddev(Schema.TypeName.INT32),
+            newVarianceAccumulator(new BigDecimal(36), new BigDecimal(4), 
ZERO),
+            3
+          },
+          {
+            VarianceFn.newSampleStddev(Schema.TypeName.INT32),
+            newVarianceAccumulator(new BigDecimal(36), new BigDecimal(5), 
ZERO),
+            3
           }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Adding test cases for extremely large and extremely small variances to 
verify the scaling logic and prevent regression of artificial 
overflow/underflow.
   
   ```java
             {
               VarianceFn.newPopulationStddev(Schema.TypeName.INT32),
               newVarianceAccumulator(new BigDecimal(36), new BigDecimal(4), 
ZERO),
               3
             },
             {
               VarianceFn.newSampleStddev(Schema.TypeName.INT32),
               newVarianceAccumulator(new BigDecimal(36), new BigDecimal(5), 
ZERO),
               3
             },
             {
               VarianceFn.newPopulationStddev(Schema.TypeName.DOUBLE),
               newVarianceAccumulator(new BigDecimal("1e400"), BigDecimal.ONE, 
ZERO),
               1e200
             },
             {
               VarianceFn.newPopulationStddev(Schema.TypeName.DOUBLE),
               newVarianceAccumulator(new BigDecimal("1e-400"), BigDecimal.ONE, 
ZERO),
               1e-200
             }
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFn.java:
##########
@@ -95,11 +99,21 @@ public static VarianceFn newSample(Schema.TypeName 
typeName) {
   public static <V extends Number> VarianceFn newSample(
       SerializableFunction<BigDecimal, V> decimalConverter) {
 
-    return new VarianceFn<>(SAMPLE, decimalConverter);
+    return new VarianceFn<>(SAMPLE, false, decimalConverter);
   }
 
-  private VarianceFn(boolean isSample, SerializableFunction<BigDecimal, T> 
decimalConverter) {
+  public static VarianceFn newSampleStddev(Schema.TypeName typeName) {
+    return new VarianceFn<>(SAMPLE, true, 
BigDecimalConverter.forSqlType(typeName));
+  }
+
+  public static VarianceFn newPopulationStddev(Schema.TypeName typeName) {
+    return new VarianceFn<>(POP, true, 
BigDecimalConverter.forSqlType(typeName));
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To maintain API consistency with `newPopulation` and `newSample`, we should 
also provide overloads for `newSampleStddev` and `newPopulationStddev` that 
accept a custom `SerializableFunction<BigDecimal, V> decimalConverter`.
   
   ```java
     public static VarianceFn newSampleStddev(Schema.TypeName typeName) {
       return newSampleStddev(BigDecimalConverter.forSqlType(typeName));
     }
   
     public static <V extends Number> VarianceFn newSampleStddev(
         SerializableFunction<BigDecimal, V> decimalConverter) {
       return new VarianceFn<>(SAMPLE, true, decimalConverter);
     }
   
     public static VarianceFn newPopulationStddev(Schema.TypeName typeName) {
       return newPopulationStddev(BigDecimalConverter.forSqlType(typeName));
     }
   
     public static <V extends Number> VarianceFn newPopulationStddev(
         SerializableFunction<BigDecimal, V> decimalConverter) {
       return new VarianceFn<>(POP, true, decimalConverter);
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to