iemejia commented on a change in pull request #16856:
URL: https://github.com/apache/beam/pull/16856#discussion_r808125396
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CountIf.java
##########
@@ -27,49 +30,41 @@
public class CountIf {
private CountIf() {}
- public static CountIfFn combineFn() {
- return new CountIf.CountIfFn();
+ public static Combine.CombineFn<Boolean, ?, Long> combineFn() {
+ return new CountIfFn();
}
- public static class CountIfFn extends Combine.CombineFn<Boolean,
CountIfFn.Accum, Long> {
+ public static class CountIfFn extends Combine.CombineFn<Boolean, long[],
Long> {
+ private final Combine.CombineFn<Boolean, long[], Long> countFn =
+ (Combine.CombineFn<Boolean, long[], Long>) Count.<Boolean>combineFn();
- public static class Accum implements Serializable {
- boolean isExpressionFalse = true;
- long countIfResult = 0L;
+ @Override
+ public long[] createAccumulator() {
+ return countFn.createAccumulator();
}
@Override
- public Accum createAccumulator() {
- return new Accum();
+ public long[] addInput(long[] accumulator, Boolean input) {
+ if (Boolean.TRUE.equals(input)) {
+ countFn.addInput(accumulator, input);
+ }
+ return accumulator;
}
@Override
- public Accum addInput(Accum accum, Boolean input) {
- if (input) {
- accum.isExpressionFalse = false;
- accum.countIfResult += 1;
- }
- return accum;
+ public long[] mergeAccumulators(Iterable<long[]> accumulators) {
+ return countFn.mergeAccumulators(accumulators);
}
@Override
- public Accum mergeAccumulators(Iterable<Accum> accums) {
- CountIfFn.Accum merged = createAccumulator();
- for (CountIfFn.Accum accum : accums) {
- if (!accum.isExpressionFalse) {
- merged.isExpressionFalse = false;
- merged.countIfResult += accum.countIfResult;
- }
- }
- return merged;
+ public Long extractOutput(long[] accumulator) {
+ return countFn.extractOutput(accumulator);
}
@Override
- public Long extractOutput(Accum accum) {
- if (!accum.isExpressionFalse) {
- return accum.countIfResult;
- }
- return 0L;
+ public Coder<long[]> getAccumulatorCoder(CoderRegistry registry,
Coder<Boolean> inputCoder)
+ throws CannotProvideCoderException {
+ return countFn.getAccumulatorCoder(registry, inputCoder);
Review comment:
Technically we did not have a specified `Coder` so it was breaking when
running on a distributed system as the JIRA ticket reported so backwards
compatibility seems less of an issue ;)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]