gemini-code-assist[bot] commented on code in PR #38871:
URL: https://github.com/apache/beam/pull/38871#discussion_r3405082912
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFn.java:
##########
@@ -133,7 +147,19 @@ public Coder<VarianceAccumulator> getAccumulatorCoder(
@Override
public T extractOutput(VarianceAccumulator accumulator) {
- return decimalConverter.apply(getVariance(accumulator));
+ BigDecimal result = getVariance(accumulator);
+ if (result != null && isStddev) {
+ double doubleVal = result.doubleValue();
+ if (doubleVal < 0.0) {
+ doubleVal = 0.0; // Clamp negative variance due to numerical
instability
+ }
+ double sqrtVal = Math.sqrt(doubleVal);
+ if (Double.isInfinite(sqrtVal)) {
+ throw new ArithmeticException("Standard deviation overflow: result is
infinity");
+ }
+ result = BigDecimal.valueOf(sqrtVal);
+ }
Review Comment:

Converting the `BigDecimal` variance to a `double` before taking the square
root can cause artificial overflow or underflow. For example, if the variance
is `1e400` (which is a valid `BigDecimal`), `result.doubleValue()` overflows to
`Double.POSITIVE_INFINITY`, causing an `ArithmeticException` even though the
actual standard deviation (`1e200`) is perfectly representable as a finite
`double` and `BigDecimal`.
To prevent this, we can scale the `BigDecimal` by an even power of 10 to
bring it into the representable range of `double` before computing the square
root, and then scale the result back.
```java
BigDecimal result = getVariance(accumulator);
if (result != null && isStddev) {
double doubleVal = result.doubleValue();
double sqrtVal;
if (Double.isInfinite(doubleVal) || (doubleVal == 0.0 &&
result.signum() != 0)) {
int scaleAdjustment = result.precision() - result.scale();
if (scaleAdjustment % 2 != 0) {
scaleAdjustment += (scaleAdjustment > 0) ? 1 : -1;
}
BigDecimal scaledResult = result.scaleByPowerOfTen(-scaleAdjustment);
doubleVal = scaledResult.doubleValue();
if (doubleVal < 0.0) {
doubleVal = 0.0;
}
sqrtVal = Math.sqrt(doubleVal);
result =
BigDecimal.valueOf(sqrtVal).scaleByPowerOfTen(scaleAdjustment / 2);
} else {
if (doubleVal < 0.0) {
doubleVal = 0.0;
}
sqrtVal = Math.sqrt(doubleVal);
if (Double.isInfinite(sqrtVal)) {
throw new ArithmeticException("Standard deviation overflow: result
is infinity");
}
result = BigDecimal.valueOf(sqrtVal);
}
}
```
##########
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFnTest.java:
##########
@@ -51,6 +52,16 @@ public static Iterable<Object[]> varianceFns() {
VarianceFn.newSample(BigDecimal::intValue),
newVarianceAccumulator(FIFTEEN, FOUR, ZERO),
5
+ },
+ {
+ VarianceFn.newPopulationStddev(Schema.TypeName.INT32),
+ newVarianceAccumulator(new BigDecimal(36), new BigDecimal(4),
ZERO),
+ 3
+ },
+ {
+ VarianceFn.newSampleStddev(Schema.TypeName.INT32),
+ newVarianceAccumulator(new BigDecimal(36), new BigDecimal(5),
ZERO),
+ 3
}
Review Comment:

Adding test cases for extremely large and extremely small variances to
verify the scaling logic and prevent regression of artificial
overflow/underflow.
```java
{
VarianceFn.newPopulationStddev(Schema.TypeName.INT32),
newVarianceAccumulator(new BigDecimal(36), new BigDecimal(4),
ZERO),
3
},
{
VarianceFn.newSampleStddev(Schema.TypeName.INT32),
newVarianceAccumulator(new BigDecimal(36), new BigDecimal(5),
ZERO),
3
},
{
VarianceFn.newPopulationStddev(Schema.TypeName.DOUBLE),
newVarianceAccumulator(new BigDecimal("1e400"), BigDecimal.ONE,
ZERO),
1e200
},
{
VarianceFn.newPopulationStddev(Schema.TypeName.DOUBLE),
newVarianceAccumulator(new BigDecimal("1e-400"), BigDecimal.ONE,
ZERO),
1e-200
}
```
##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/VarianceFn.java:
##########
@@ -95,11 +99,21 @@ public static VarianceFn newSample(Schema.TypeName
typeName) {
public static <V extends Number> VarianceFn newSample(
SerializableFunction<BigDecimal, V> decimalConverter) {
- return new VarianceFn<>(SAMPLE, decimalConverter);
+ return new VarianceFn<>(SAMPLE, false, decimalConverter);
}
- private VarianceFn(boolean isSample, SerializableFunction<BigDecimal, T>
decimalConverter) {
+ public static VarianceFn newSampleStddev(Schema.TypeName typeName) {
+ return new VarianceFn<>(SAMPLE, true,
BigDecimalConverter.forSqlType(typeName));
+ }
+
+ public static VarianceFn newPopulationStddev(Schema.TypeName typeName) {
+ return new VarianceFn<>(POP, true,
BigDecimalConverter.forSqlType(typeName));
+ }
Review Comment:

To maintain API consistency with `newPopulation` and `newSample`, we should
also provide overloads for `newSampleStddev` and `newPopulationStddev` that
accept a custom `SerializableFunction<BigDecimal, V> decimalConverter`.
```java
public static VarianceFn newSampleStddev(Schema.TypeName typeName) {
return newSampleStddev(BigDecimalConverter.forSqlType(typeName));
}
public static <V extends Number> VarianceFn newSampleStddev(
SerializableFunction<BigDecimal, V> decimalConverter) {
return new VarianceFn<>(SAMPLE, true, decimalConverter);
}
public static VarianceFn newPopulationStddev(Schema.TypeName typeName) {
return newPopulationStddev(BigDecimalConverter.forSqlType(typeName));
}
public static <V extends Number> VarianceFn newPopulationStddev(
SerializableFunction<BigDecimal, V> decimalConverter) {
return new VarianceFn<>(POP, true, decimalConverter);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]