This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregationOp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac2fe2aff830d78ac3a5a5e23afc0a71a03f9456 Author: Alima777 <[email protected]> AuthorDate: Mon May 9 21:09:12 2022 +0800 accumulator return null while no data --- .../iotdb/db/mpp/aggregation/AvgAccumulator.java | 23 ++++++++++++-- .../iotdb/db/mpp/aggregation/CountAccumulator.java | 20 ++++++++++-- .../db/mpp/aggregation/ExtremeAccumulator.java | 12 ++++++++ .../db/mpp/aggregation/FirstValueAccumulator.java | 9 ++++++ .../db/mpp/aggregation/LastValueAccumulator.java | 16 ++++++++++ .../mpp/aggregation/LastValueDescAccumulator.java | 36 +--------------------- .../db/mpp/aggregation/MaxTimeAccumulator.java | 20 ++++++++++-- .../db/mpp/aggregation/MaxTimeDescAccumulator.java | 15 +-------- .../db/mpp/aggregation/MaxValueAccumulator.java | 11 +++++++ .../db/mpp/aggregation/MinTimeAccumulator.java | 15 +++++++-- .../iotdb/db/mpp/aggregation/SumAccumulator.java | 9 ++++-- 11 files changed, 125 insertions(+), 61 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java index 28ef1e79ec..fab0dcf662 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java @@ -71,12 +71,14 @@ public class AvgAccumulator implements Accumulator { if (partialResult[0].isNull(0)) { return; } + initResult = true; countValue += partialResult[0].getLong(0); sumValue += partialResult[1].getDouble(0); } @Override public void addStatistics(Statistics statistics) { + initResult = true; countValue += statistics.getCount(); if (statistics instanceof IntegerStatistics) { sumValue += statistics.getSumLongValue(); @@ -89,6 +91,10 @@ public class AvgAccumulator implements Accumulator { @Override public void setFinal(Column finalResult) { reset(); + if (finalResult.isNull(0)) { + return; + } + initResult = true; countValue = 1; sumValue = finalResult.getDouble(0); } @@ -96,17 +102,27 @@ public class AvgAccumulator implements Accumulator { @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { checkArgument(columnBuilders.length == 2, "partialResult of Avg should be 2"); - columnBuilders[0].writeLong(countValue); - columnBuilders[1].writeDouble(sumValue); + if (!initResult) { + columnBuilders[0].appendNull(); + columnBuilders[1].appendNull(); + } else { + columnBuilders[0].writeLong(countValue); + columnBuilders[1].writeDouble(sumValue); + } } @Override public void outputFinal(ColumnBuilder columnBuilder) { - columnBuilder.writeDouble(sumValue / countValue); + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeDouble(sumValue / countValue); + } } @Override public void reset() { + initResult = false; this.countValue = 0; this.sumValue = 0.0; } @@ -179,6 +195,7 @@ public class AvgAccumulator implements Accumulator { break; } if (!column[1].isNull(i)) { + initResult = true; countValue++; sumValue += column[1].getDouble(i); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java index 4750cdf01e..573917b1cd 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java @@ -45,6 +45,7 @@ public class CountAccumulator implements Accumulator { break; } if (!column[1].isNull(i)) { + initResult = true; countValue++; } } @@ -57,17 +58,23 @@ public class CountAccumulator implements Accumulator { if (partialResult[0].isNull(0)) { return; } + initResult = true; countValue += partialResult[0].getLong(0); } @Override public void addStatistics(Statistics statistics) { + initResult = true; countValue += statistics.getCount(); } // finalResult should be single column, like: | finalCountValue | @Override public void setFinal(Column finalResult) { + if (finalResult.isNull(0)) { + return; + } + initResult = true; countValue = finalResult.getLong(0); } @@ -75,16 +82,25 @@ public class CountAccumulator implements Accumulator { @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { checkArgument(columnBuilders.length == 1, "partialResult of Count should be 1"); - columnBuilders[0].writeLong(countValue); + if (!initResult) { + columnBuilders[0].appendNull(); + } else { + columnBuilders[0].writeLong(countValue); + } } @Override public void outputFinal(ColumnBuilder columnBuilder) { - columnBuilder.writeLong(countValue); + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeLong(countValue); + } } @Override public void reset() { + initResult = false; this.countValue = 0; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java index 78e7c986c7..fa616b52c5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java @@ -121,6 +121,10 @@ public class ExtremeAccumulator implements Accumulator { @Override public void setFinal(Column finalResult) { + if (finalResult.isNull(0)) { + return; + } + initResult = true; extremeResult.setObject(finalResult.getObject(0)); } @@ -128,6 +132,10 @@ public class ExtremeAccumulator implements Accumulator { @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { checkArgument(columnBuilders.length == 1, "partialResult of ExtremeValue should be 1"); + if (!initResult) { + columnBuilders[0].appendNull(); + return; + } switch (seriesDataType) { case INT32: columnBuilders[0].writeInt(extremeResult.getInt()); @@ -151,6 +159,10 @@ public class ExtremeAccumulator implements Accumulator { @Override public void outputFinal(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + return; + } switch (seriesDataType) { case INT32: columnBuilder.writeInt(extremeResult.getInt()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java index 908bbf4605..5677d936b0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java @@ -130,6 +130,7 @@ public class FirstValueAccumulator implements Accumulator { @Override public void setFinal(Column finalResult) { reset(); + hasCandidateResult = true; firstValue.setObject(finalResult.getObject(0)); } @@ -137,6 +138,10 @@ public class FirstValueAccumulator implements Accumulator { @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { checkArgument(columnBuilders.length == 2, "partialResult of FirstValue should be 2"); + if (!hasCandidateResult) { + columnBuilders[0].appendNull(); + return; + } switch (seriesDataType) { case INT32: columnBuilders[0].writeInt(firstValue.getInt()); @@ -165,6 +170,10 @@ public class FirstValueAccumulator implements Accumulator { @Override public void outputFinal(ColumnBuilder columnBuilder) { + if (!hasCandidateResult) { + columnBuilder.appendNull(); + return; + } switch (seriesDataType) { case INT32: columnBuilder.writeInt(firstValue.getInt()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java index f85dd8f5be..8699f650b1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java @@ -35,6 +35,7 @@ public class LastValueAccumulator implements Accumulator { protected final TSDataType seriesDataType; protected TsPrimitiveType lastValue; protected long maxTime = Long.MIN_VALUE; + protected boolean initResult = false; public LastValueAccumulator(TSDataType seriesDataType) { this.seriesDataType = seriesDataType; @@ -136,6 +137,10 @@ public class LastValueAccumulator implements Accumulator { @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { checkArgument(columnBuilders.length == 2, "partialResult of LastValue should be 2"); + if (!initResult) { + columnBuilders[0].appendNull(); + return; + } switch (seriesDataType) { case INT32: columnBuilders[0].writeInt(lastValue.getInt()); @@ -164,6 +169,10 @@ public class LastValueAccumulator implements Accumulator { @Override public void outputFinal(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + return; + } switch (seriesDataType) { case INT32: columnBuilder.writeInt(lastValue.getInt()); @@ -191,6 +200,7 @@ public class LastValueAccumulator implements Accumulator { @Override public void reset() { + initResult = false; this.maxTime = Long.MIN_VALUE; this.lastValue.reset(); } @@ -220,6 +230,7 @@ public class LastValueAccumulator implements Accumulator { } protected void updateIntLastValue(int value, long curTime) { + initResult = true; if (curTime > maxTime) { maxTime = curTime; lastValue.setInt(value); @@ -236,6 +247,7 @@ public class LastValueAccumulator implements Accumulator { } protected void updateLongLastValue(long value, long curTime) { + initResult = true; if (curTime > maxTime) { maxTime = curTime; lastValue.setLong(value); @@ -252,6 +264,7 @@ public class LastValueAccumulator implements Accumulator { } protected void updateFloatLastValue(float value, long curTime) { + initResult = true; if (curTime > maxTime) { maxTime = curTime; lastValue.setFloat(value); @@ -268,6 +281,7 @@ public class LastValueAccumulator implements Accumulator { } protected void updateDoubleLastValue(double value, long curTime) { + initResult = true; if (curTime > maxTime) { maxTime = curTime; lastValue.setDouble(value); @@ -284,6 +298,7 @@ public class LastValueAccumulator implements Accumulator { } protected void updateBooleanLastValue(boolean value, long curTime) { + initResult = true; if (curTime > maxTime) { maxTime = curTime; lastValue.setBoolean(value); @@ -300,6 +315,7 @@ public class LastValueAccumulator implements Accumulator { } protected void updateBinaryLastValue(Binary value, long curTime) { + initResult = true; if (curTime > maxTime) { maxTime = curTime; lastValue.setBinary(value); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java index 3360bad7a3..9a58e3f17d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java @@ -22,24 +22,20 @@ package org.apache.iotdb.db.mpp.aggregation; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.column.Column; -import org.apache.iotdb.tsfile.utils.Binary; public class LastValueDescAccumulator extends LastValueAccumulator { - private boolean hasCandidateResult = false; - public LastValueDescAccumulator(TSDataType seriesDataType) { super(seriesDataType); } @Override public boolean hasFinalResult() { - return hasCandidateResult; + return initResult; } @Override public void reset() { - hasCandidateResult = false; super.reset(); } @@ -102,34 +98,4 @@ public class LastValueDescAccumulator extends LastValueAccumulator { } } } - - protected void updateIntLastValue(int value, long curTime) { - hasCandidateResult = true; - super.updateIntLastValue(value, curTime); - } - - protected void updateLongLastValue(long value, long curTime) { - hasCandidateResult = true; - super.updateLongLastValue(value, curTime); - } - - protected void updateFloatLastValue(float value, long curTime) { - hasCandidateResult = true; - super.updateFloatLastValue(value, curTime); - } - - protected void updateDoubleLastValue(double value, long curTime) { - hasCandidateResult = true; - super.updateDoubleLastValue(value, curTime); - } - - protected void updateBooleanLastValue(boolean value, long curTime) { - hasCandidateResult = true; - super.updateBooleanLastValue(value, curTime); - } - - protected void updateBinaryLastValue(Binary value, long curTime) { - hasCandidateResult = true; - super.updateBinaryLastValue(value, curTime); - } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java index cdd2dfd563..be63c2da5e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java @@ -30,7 +30,7 @@ import static com.google.common.base.Preconditions.checkArgument; public class MaxTimeAccumulator implements Accumulator { protected long maxTime = Long.MIN_VALUE; - private boolean initResult = false; + protected boolean initResult = false; public MaxTimeAccumulator() {} @@ -64,6 +64,10 @@ public class MaxTimeAccumulator implements Accumulator { // finalResult should be single column, like: | finalMaxTime | @Override public void setFinal(Column finalResult) { + if (finalResult.isNull(0)) { + return; + } + initResult = true; maxTime = finalResult.getLong(0); } @@ -71,16 +75,25 @@ public class MaxTimeAccumulator implements Accumulator { @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { checkArgument(columnBuilders.length == 1, "partialResult of MaxTime should be 1"); - columnBuilders[0].writeLong(maxTime); + if (!initResult) { + columnBuilders[0].appendNull(); + } else { + columnBuilders[0].writeLong(maxTime); + } } @Override public void outputFinal(ColumnBuilder columnBuilder) { - columnBuilder.writeLong(maxTime); + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeLong(maxTime); + } } @Override public void reset() { + initResult = false; this.maxTime = Long.MIN_VALUE; } @@ -100,6 +113,7 @@ public class MaxTimeAccumulator implements Accumulator { } protected void updateMaxTime(long curTime) { + initResult = true; maxTime = Math.max(maxTime, curTime); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java index b753e9ca3b..6159d030e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java @@ -24,8 +24,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.Column; public class MaxTimeDescAccumulator extends MaxTimeAccumulator { - private boolean hasCandidateResult = false; - // Column should be like: | Time | Value | // Value is used to judge isNull() @Override @@ -41,17 +39,6 @@ public class MaxTimeDescAccumulator extends MaxTimeAccumulator { @Override public boolean hasFinalResult() { - return hasCandidateResult; - } - - @Override - public void reset() { - hasCandidateResult = false; - super.reset(); - } - - protected void updateMaxTime(long curTime) { - hasCandidateResult = true; - super.updateMaxTime(curTime); + return initResult; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java index 606fce0a8a..cc699dabff 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java @@ -119,6 +119,9 @@ public class MaxValueAccumulator implements Accumulator { // finalResult should be single column, like: | finalCountValue | @Override public void setFinal(Column finalResult) { + if (finalResult.isNull(0)) { + return; + } maxResult.setObject(finalResult.getObject(0)); } @@ -126,6 +129,10 @@ public class MaxValueAccumulator implements Accumulator { @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { checkArgument(columnBuilders.length == 1, "partialResult of MaxValue should be 1"); + if (!initResult) { + columnBuilders[0].appendNull(); + return; + } switch (seriesDataType) { case INT32: columnBuilders[0].writeInt(maxResult.getInt()); @@ -149,6 +156,10 @@ public class MaxValueAccumulator implements Accumulator { @Override public void outputFinal(ColumnBuilder columnBuilder) { + if (!initResult) { + columnBuilder.appendNull(); + return; + } switch (seriesDataType) { case INT32: columnBuilder.writeInt(maxResult.getInt()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java index 6969e82aa5..8652af5ab6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java @@ -65,6 +65,9 @@ public class MinTimeAccumulator implements Accumulator { // finalResult should be single column, like: | finalMinTime | @Override public void setFinal(Column finalResult) { + if (finalResult.isNull(0)) { + return; + } minTime = finalResult.getLong(0); } @@ -72,12 +75,20 @@ public class MinTimeAccumulator implements Accumulator { @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { checkArgument(columnBuilders.length == 1, "partialResult of MinTime should be 1"); - columnBuilders[0].writeLong(minTime); + if (!hasCandidateResult) { + columnBuilders[0].appendNull(); + } else { + columnBuilders[0].writeLong(minTime); + } } @Override public void outputFinal(ColumnBuilder columnBuilder) { - columnBuilder.writeLong(minTime); + if (!hasCandidateResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeLong(minTime); + } } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java index 5672e970fb..512fbf4bcf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java @@ -71,11 +71,13 @@ public class SumAccumulator implements Accumulator { if (partialResult[0].isNull(0)) { return; } + initResult = true; sumValue += partialResult[0].getDouble(0); } @Override public void addStatistics(Statistics statistics) { + initResult = true; if (statistics instanceof IntegerStatistics) { sumValue += statistics.getSumLongValue(); } else { @@ -99,20 +101,23 @@ public class SumAccumulator implements Accumulator { checkArgument(columnBuilders.length == 1, "partialResult of Sum should be 1"); if (!initResult) { columnBuilders[0].appendNull(); + } else { + columnBuilders[0].writeDouble(sumValue); } - columnBuilders[0].writeDouble(sumValue); } @Override public void outputFinal(ColumnBuilder columnBuilder) { if (!initResult) { columnBuilder.appendNull(); + } else { + columnBuilder.writeDouble(sumValue); } - columnBuilder.writeDouble(sumValue); } @Override public void reset() { + initResult = false; this.sumValue = 0; }
