This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 87a942b28567ff64f40135bac165bda247940f9e Author: Lei Rui <[email protected]> AuthorDate: Fri Feb 2 01:56:30 2024 +0800 add metrics; unify allinone --- .../groupby/GroupByWithoutValueFilterDataSet.java | 359 ++++++++++------- .../groupby/LocalGroupByExecutorTri_ILTS.java | 1 + .../groupby/LocalGroupByExecutorTri_LTTB.java | 2 + .../groupby/LocalGroupByExecutorTri_M4.java | 208 +++++----- .../LocalGroupByExecutorTri_M4_deprecated.java | 446 +++++++++++++++++++++ .../groupby/LocalGroupByExecutorTri_MinMax.java | 148 +++++-- ...ocalGroupByExecutorTri_MinMaxPreselection.java} | 8 +- .../apache/iotdb/db/integration/tri/MyTest_M4.java | 4 +- .../iotdb/db/integration/tri/MyTest_MinMax.java | 6 +- 9 files changed, 902 insertions(+), 280 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 0bfdd58c8af..a51924bb9d8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -135,20 +135,19 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { @Override public RowRecord nextWithoutConstraint() throws IOException { - if (CONFIG.getEnableTri().equals("MinMax")) { - return nextWithoutConstraintTri_MinMax(); + if (CONFIG.getEnableTri().equals("MinMax") + || CONFIG.getEnableTri().equals("M4") + || CONFIG.getEnableTri().equals("LTTB") + || CONFIG.getEnableTri().equals("ILTS")) { + return nextWithoutConstraintTri_allInOne(); } else if (CONFIG.getEnableTri().equals("MinMaxLTTB")) { return nextWithoutConstraintTri_MinMaxLTTB(); - } else if (CONFIG.getEnableTri().equals("M4")) { - return nextWithoutConstraintTri_M4(); - } else if (CONFIG.getEnableTri().equals("LTTB") || CONFIG.getEnableTri().equals("ILTS")) { - return nextWithoutConstraintTri_LTTB(); } else { return nextWithoutConstraint_raw(); } } - public RowRecord nextWithoutConstraintTri_LTTB() throws IOException { + public RowRecord nextWithoutConstraintTri_allInOne() throws IOException { RowRecord record; try { GroupByExecutor executor = null; @@ -165,7 +164,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { List<AggregateResult> aggregations = executor.calcResult(startTime, startTime + interval, startTime, endTime, interval); MinMaxInfo minMaxInfo = (MinMaxInfo) aggregations.get(0).getResult(); - series.append(minMaxInfo.val); // 对于LTTB来说,MinValueAggrResult的[t]也无意义,因为只需要val + series.append(minMaxInfo.val); // MinValueAggrResult的[t]也无意义,因为只需要val // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) @@ -376,135 +375,214 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { return record; } - public RowRecord nextWithoutConstraintTri_M4() throws IOException { - RowRecord record; - try { - GroupByExecutor executor = null; - for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { - executor = pathToExecutorEntry.getValue(); // assume only one series here - break; - } - - // concat results into a string - record = new RowRecord(0); - StringBuilder series = new StringBuilder(); - // 全局首点(对于M4来说全局首尾点只是输出不会影响到其它桶的采点) - series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); - - for (long localCurStartTime = startTime; - localCurStartTime + interval <= endTime; - // 注意有等号!因为左闭右开 - // + interval to make the last bucket complete - // e.g, T=11,nout=3,interval=floor(11/3)=3, - // [0,3),[3,6),[6,9), no need incomplete [9,11) - // then the number of buckets must be Math.floor((endTime-startTime)/interval) - localCurStartTime += interval) { // not change real curStartTime&curEndTime - // attention the returned aggregations need deep copy if using directly - List<AggregateResult> aggregations = - executor.calcResult( - localCurStartTime, - localCurStartTime + interval, - startTime, - endTime, - interval); // attention - - // min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0), last_value(s0) - // minValue[bottomTime] - series.append(aggregations.get(0).getResult()).append(","); - // maxValue[topTime] - series.append(aggregations.get(1).getResult()).append(","); - // firstValue[firstTime] - series - .append(aggregations.get(4).getResult()) - .append("[") - .append(aggregations.get(2).getResult()) - .append("]") - .append(","); - // lastValue[lastTime] - series - .append(aggregations.get(5).getResult()) - .append("[") - .append(aggregations.get(3).getResult()) - .append("]") - .append(","); - } - - // 全局尾点(对于M4来说全局首尾点只是输出不会影响到其它桶的采点) - series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); - - // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) - // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 - record.addField(series, TSDataType.MIN_MAX_INT64); - - } catch (QueryProcessException e) { - logger.error("GroupByWithoutValueFilterDataSet execute has error", e); - throw new IOException(e.getMessage(), e); - } - - // in the end, make the next hasNextWithoutConstraint() false - // as we already fetch all here - curStartTime = endTime; - hasCachedTimeInterval = false; - - return record; - } - - public RowRecord nextWithoutConstraintTri_MinMax() throws IOException { - RowRecord record; - try { - GroupByExecutor executor = null; - for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { - executor = pathToExecutorEntry.getValue(); // assume only one series here - break; - } - - // concat results into a string - record = new RowRecord(0); - StringBuilder series = new StringBuilder(); - // 全局首点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) - series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); - - for (long localCurStartTime = startTime; - localCurStartTime + interval <= endTime; - // 注意有等号!因为左闭右开 - // + interval to make the last bucket complete - // e.g, T=11,nout=3,interval=floor(11/3)=3, - // [0,3),[3,6),[6,9), no need incomplete [9,11) - // then the number of buckets must be Math.floor((endTime-startTime)/interval) - localCurStartTime += interval) { // not change real curStartTime&curEndTime - // attention the returned aggregations need deep copy if using directly - List<AggregateResult> aggregations = - executor.calcResult( - localCurStartTime, - localCurStartTime + interval, - startTime, - endTime, - interval); // attention - series.append(aggregations.get(0).getResult()).append(","); // BPv[BPt] - series.append(aggregations.get(1).getResult()).append(","); // TPv[TPt] - } - - // 全局尾点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) - series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); - - // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) - // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) - // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 - record.addField(series, TSDataType.MIN_MAX_INT64); - - } catch (QueryProcessException e) { - logger.error("GroupByWithoutValueFilterDataSet execute has error", e); - throw new IOException(e.getMessage(), e); - } - - // in the end, make the next hasNextWithoutConstraint() false - // as we already fetch all here - curStartTime = endTime; - hasCachedTimeInterval = false; - - return record; - } + // public RowRecord nextWithoutConstraintTri_M4() throws IOException { + // RowRecord record; + // try { + // GroupByExecutor executor = null; + // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { + // executor = pathToExecutorEntry.getValue(); // assume only one series here + // break; + // } + // + // // concat results into a string + // record = new RowRecord(0); + // StringBuilder series = new StringBuilder(); + // // 全局首点(对于M4来说全局首尾点只是输出不会影响到其它桶的采点) + // + // series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); + // + // for (long localCurStartTime = startTime; + // localCurStartTime + interval <= endTime; + // // 注意有等号!因为左闭右开 + // // + interval to make the last bucket complete + // // e.g, T=11,nout=3,interval=floor(11/3)=3, + // // [0,3),[3,6),[6,9), no need incomplete [9,11) + // // then the number of buckets must be Math.floor((endTime-startTime)/interval) + // localCurStartTime += interval) { // not change real curStartTime&curEndTime + // // attention the returned aggregations need deep copy if using directly + // List<AggregateResult> aggregations = + // executor.calcResult( + // localCurStartTime, + // localCurStartTime + interval, + // startTime, + // endTime, + // interval); // attention + // + // // min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0), + // last_value(s0) + // // minValue[bottomTime] + // series.append(aggregations.get(0).getResult()).append(","); + // // maxValue[topTime] + // series.append(aggregations.get(1).getResult()).append(","); + // // firstValue[firstTime] + // series + // .append(aggregations.get(4).getResult()) + // .append("[") + // .append(aggregations.get(2).getResult()) + // .append("]") + // .append(","); + // // lastValue[lastTime] + // series + // .append(aggregations.get(5).getResult()) + // .append("[") + // .append(aggregations.get(3).getResult()) + // .append("]") + // .append(","); + // } + // + // // 全局尾点(对于M4来说全局首尾点只是输出不会影响到其它桶的采点) + // + // series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); + // + // // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) + // // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) + // // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 + // record.addField(series, TSDataType.MIN_MAX_INT64); + // + // } catch (QueryProcessException e) { + // logger.error("GroupByWithoutValueFilterDataSet execute has error", e); + // throw new IOException(e.getMessage(), e); + // } + // + // // in the end, make the next hasNextWithoutConstraint() false + // // as we already fetch all here + // curStartTime = endTime; + // hasCachedTimeInterval = false; + // + // return record; + // } + + // public RowRecord nextWithoutConstraintTri_MinMax() throws IOException { + // RowRecord record; + // try { + // GroupByExecutor executor = null; + // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { + // executor = pathToExecutorEntry.getValue(); // assume only one series here + // break; + // } + // + // // concat results into a string + // record = new RowRecord(0); + // StringBuilder series = new StringBuilder(); + // + // // all bucket results as string in value of MinValueAggrResult + // List<AggregateResult> aggregations = + // executor.calcResult(startTime, startTime + interval, startTime, endTime, interval); + // MinMaxInfo minMaxInfo = (MinMaxInfo) aggregations.get(0).getResult(); + // series.append(minMaxInfo.val); // MinValueAggrResult的[t]也无意义,因为只需要val + // + // // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) + // // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) + // // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 + // record.addField(series, TSDataType.MIN_MAX_INT64); + // + // } catch (QueryProcessException e) { + // logger.error("GroupByWithoutValueFilterDataSet execute has error", e); + // throw new IOException(e.getMessage(), e); + // } + // + // // in the end, make the next hasNextWithoutConstraint() false + // // as we already fetch all here + // curStartTime = endTime; + // hasCachedTimeInterval = false; + // + // return record; + // } + + // public RowRecord nextWithoutConstraintTri_M4() throws IOException { + // RowRecord record; + // try { + // GroupByExecutor executor = null; + // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { + // executor = pathToExecutorEntry.getValue(); // assume only one series here + // break; + // } + // + // // concat results into a string + // record = new RowRecord(0); + // StringBuilder series = new StringBuilder(); + // + // // all bucket results as string in value of MinValueAggrResult + // List<AggregateResult> aggregations = + // executor.calcResult(startTime, startTime + interval, startTime, endTime, interval); + // MinMaxInfo minMaxInfo = (MinMaxInfo) aggregations.get(0).getResult(); + // series.append(minMaxInfo.val); // MinValueAggrResult的[t]也无意义,因为只需要val + // + // // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) + // // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) + // // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 + // record.addField(series, TSDataType.MIN_MAX_INT64); + // + // } catch (QueryProcessException e) { + // logger.error("GroupByWithoutValueFilterDataSet execute has error", e); + // throw new IOException(e.getMessage(), e); + // } + // + // // in the end, make the next hasNextWithoutConstraint() false + // // as we already fetch all here + // curStartTime = endTime; + // hasCachedTimeInterval = false; + // + // return record; + // } + + // public RowRecord nextWithoutConstraintTri_MinMax() throws IOException { + // RowRecord record; + // try { + // GroupByExecutor executor = null; + // for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { + // executor = pathToExecutorEntry.getValue(); // assume only one series here + // break; + // } + // + // // concat results into a string + // record = new RowRecord(0); + // StringBuilder series = new StringBuilder(); + // // 全局首点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) + // + // series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); + // + // for (long localCurStartTime = startTime; + // localCurStartTime + interval <= endTime; + // // 注意有等号!因为左闭右开 + // // + interval to make the last bucket complete + // // e.g, T=11,nout=3,interval=floor(11/3)=3, + // // [0,3),[3,6),[6,9), no need incomplete [9,11) + // // then the number of buckets must be Math.floor((endTime-startTime)/interval) + // localCurStartTime += interval) { // not change real curStartTime&curEndTime + // // attention the returned aggregations need deep copy if using directly + // List<AggregateResult> aggregations = + // executor.calcResult( + // localCurStartTime, + // localCurStartTime + interval, + // startTime, + // endTime, + // interval); // attention + // series.append(aggregations.get(0).getResult()).append(","); // BPv[BPt] + // series.append(aggregations.get(1).getResult()).append(","); // TPv[TPt] + // } + // + // // 全局尾点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) + // + // series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); + // + // // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) + // // 注意sql第一项一定要是min_value因为以后会用到record.addField(series, TSDataType.MIN_MAX_INT64) + // // 把所有序列组装成string放在第一行第二列里,否则field类型和TSDataType.MIN_MAX_INT64对不上的会有问题。 + // record.addField(series, TSDataType.MIN_MAX_INT64); + // + // } catch (QueryProcessException e) { + // logger.error("GroupByWithoutValueFilterDataSet execute has error", e); + // throw new IOException(e.getMessage(), e); + // } + // + // // in the end, make the next hasNextWithoutConstraint() false + // // as we already fetch all here + // curStartTime = endTime; + // hasCachedTimeInterval = false; + // + // return record; + // } public RowRecord nextWithoutConstraint_raw() throws IOException { if (!hasCachedTimeInterval) { @@ -574,8 +652,11 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { TsFileFilter fileFilter, boolean ascending) throws StorageEngineException, QueryProcessException { - if (CONFIG.getEnableTri().equals("MinMax") || CONFIG.getEnableTri().equals("MinMaxLTTB")) { - return new LocalGroupByExecutorTri_MinMax( + if (CONFIG.getEnableTri().equals("MinMax")) { + return new LocalGroupByExecutorTri_MinMax( // TODO + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } else if (CONFIG.getEnableTri().equals("MinMaxLTTB")) { + return new LocalGroupByExecutorTri_MinMaxPreselection( // TODO path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } else if (CONFIG.getEnableTri().equals("M4")) { return new LocalGroupByExecutorTri_M4( diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java index 7e32ed46de9..29ec858ff9e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java @@ -261,6 +261,7 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { // 2. 计算平均点 PageReader pageReader = chunkSuit4Tri.pageReader; for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; long timestamp = pageReader.timeBuffer.getLong(j * 8); if (timestamp < rightStartTime) { continue; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java index 1c802efdb4c..8fe3e792c17 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java @@ -201,6 +201,7 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { // 2. 计算平均点 PageReader pageReader = chunkSuit4Tri.pageReader; for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; long timestamp = pageReader.timeBuffer.getLong(j * 8); if (timestamp < rightStartTime) { continue; @@ -249,6 +250,7 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); int j; for (j = 0; j < count; j++) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; long timestamp = pageReader.timeBuffer.getLong(j * 8); if (timestamp < localCurStartTime) { continue; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4.java index 196e3bb0b9d..22590c54942 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4.java @@ -19,12 +19,13 @@ package org.apache.iotdb.db.query.dataset.groupby; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.query.aggregation.AggregateResult; -import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult; import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; @@ -41,6 +42,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; import org.apache.iotdb.tsfile.read.filter.GroupByFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.page.PageReader; @@ -59,6 +61,8 @@ import java.util.Set; public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); // Aggregate result buffer of this path @@ -69,6 +73,8 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { private Filter timeFilter; + private final int N1; // 分桶数 + public LocalGroupByExecutorTri_M4( PartialPath path, Set<String> allSensors, @@ -101,6 +107,12 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { fileFilter, ascending); + GroupByFilter groupByFilter = (GroupByFilter) timeFilter; + long startTime = groupByFilter.getStartTime(); + long endTime = groupByFilter.getEndTime(); + long interval = groupByFilter.getInterval(); + N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数 + // unpackAllOverlappedFilesToTimeSeriesMetadata try { // : this might be bad to load all chunk metadata at first @@ -261,81 +273,86 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { public List<AggregateResult> calcResult( long curStartTime, long curEndTime, long startTime, long endTime, long interval) throws IOException { + // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, + // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) + // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket + StringBuilder series = new StringBuilder(); + // clear result cache for (AggregateResult result : results) { result.reset(); } - // long start = System.nanoTime(); - getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime); - // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); + // 全局首点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) + series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); + + // Assume no empty buckets + for (int b = 0; b < N1; b++) { + long localCurStartTime = startTime + (b) * interval; + long localCurEndTime = startTime + (b + 1) * interval; + + getCurrentChunkListFromFutureChunkList(localCurStartTime, localCurEndTime); + + if (currentChunkList.size() == 0) { + // System.out.println("MinMax empty currentChunkList"); // TODO debug + // minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime] + series.append("null[null],null[null],null[null],null[null],"); + continue; + } - if (currentChunkList.size() == 0) { - return results; + calculateM4(currentChunkList, localCurStartTime, localCurEndTime, series); } - // start = System.nanoTime(); - calculateM4(currentChunkList, curStartTime, curEndTime); - // IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - start); + // 全局尾点 + series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); + + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0)); return results; } - private void calculateM4(List<ChunkSuit4Tri> currentChunkList, long curStartTime, long curEndTime) + private void calculateM4( + List<ChunkSuit4Tri> currentChunkList, + long curStartTime, + long curEndTime, + StringBuilder series) throws IOException { + double minValue = Double.MAX_VALUE; + long bottomTime = -1; + double maxValue = -Double.MAX_VALUE; // Double.MIN_VALUE is positive so do not use it!!! + long topTime = -1; + long firstTime = -1; + double firstValue = 0; + long lastTime = -1; + double lastValue = 0; + for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) { Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics(); if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) { - // min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0),last_value(s0) - // update min_time - results - .get(2) - .updateResultUsingValues( - new long[] {chunkSuit4Tri.chunkMetadata.getStartTime()}, - 1, - new Object[] {statistics.getFirstValue()}); - // update first_value - results - .get(4) - .updateResultUsingValues( - new long[] {chunkSuit4Tri.chunkMetadata.getStartTime()}, - 1, - new Object[] {statistics.getFirstValue()}); - // update max_time - results - .get(3) - .updateResultUsingValues( - new long[] {chunkSuit4Tri.chunkMetadata.getEndTime()}, - 1, - new Object[] {statistics.getLastValue()}); - // update last_value - results - .get(5) - .updateResultUsingValues( - new long[] {chunkSuit4Tri.chunkMetadata.getEndTime()}, - 1, - new Object[] {statistics.getLastValue()}); // update BP - MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); - minValueAggrResult.updateResult( - new MinMaxInfo<>(statistics.getMinValue(), statistics.getBottomTimestamp())); + double chunkMinValue = (double) statistics.getMinValue(); + if (chunkMinValue < minValue) { + minValue = chunkMinValue; + bottomTime = statistics.getBottomTimestamp(); + } // update TP - MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); - maxValueAggrResult.updateResult( - new MinMaxInfo<>(statistics.getMaxValue(), statistics.getTopTimestamp())); + double chunkMaxValue = (double) statistics.getMaxValue(); + if (chunkMaxValue > maxValue) { + maxValue = chunkMaxValue; + topTime = statistics.getTopTimestamp(); + } + // update FP + if (firstTime < 0) { + firstTime = statistics.getStartTime(); + firstValue = (double) statistics.getFirstValue(); + } + lastTime = statistics.getEndTime(); // assume sequential data + lastValue = (double) statistics.getLastValue(); } else { // cannot use statistics directly - double minVal = Double.MAX_VALUE; - long bottomTime = -1; - double maxVal = -Double.MAX_VALUE; // Double.MIN_VALUE is positive so do not use it!!! - long topTime = -1; - long firstTime = -1; - double firstValue = 0; - long lastTime = -1; - double lastValue = 0; - // 1. load page data if it hasn't been loaded TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); if (dataType != TSDataType.DOUBLE) { @@ -355,6 +372,7 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { PageReader pageReader = chunkSuit4Tri.pageReader; int i; for (i = chunkSuit4Tri.lastReadPos; i < count; i++) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; long timestamp = pageReader.timeBuffer.getLong(i * 8); if (timestamp < curStartTime) { // 2. read from lastReadPos until the first point fallen within this bucket (if it @@ -375,57 +393,61 @@ public class LocalGroupByExecutorTri_M4 implements GroupByExecutor { } lastTime = timestamp; lastValue = v; - if (v < minVal) { - minVal = v; + if (v < minValue) { + minValue = v; bottomTime = timestamp; } - if (v > maxVal) { - maxVal = v; + if (v > maxValue) { + maxValue = v; topTime = timestamp; } } } - // clear for heap space - if (i >= count) { - // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page - // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 - chunkSuit4Tri.pageReader = null; - } - // 4. update MinMax by traversing points fallen within this bucket - if (topTime >= 0) { - // min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0),last_value(s0) - // update min_time - results - .get(2) - .updateResultUsingValues(new long[] {firstTime}, 1, new Object[] {firstValue}); - // update first_value - results - .get(4) - .updateResultUsingValues(new long[] {firstTime}, 1, new Object[] {firstValue}); - // update max_time - results - .get(3) - .updateResultUsingValues(new long[] {lastTime}, 1, new Object[] {lastValue}); - // update last_value - results - .get(5) - .updateResultUsingValues(new long[] {lastTime}, 1, new Object[] {lastValue}); - // update BP - MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); - minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, bottomTime)); - // update TP - MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); - maxValueAggrResult.updateResult(new MinMaxInfo<>(maxVal, topTime)); - } + // // clear for heap space + // if (i >= count) { + // // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page + // // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 + // chunkSuit4Tri.pageReader = null; + // } } } + // 记录结果 + if (topTime >= 0) { + // minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime] + series + .append(minValue) + .append("[") + .append(bottomTime) + .append("]") + .append(",") + .append(maxValue) + .append("[") + .append(topTime) + .append("]") + .append(",") + .append(firstValue) + .append("[") + .append(firstTime) + .append("]") + .append(",") + .append(lastValue) + .append("[") + .append(lastTime) + .append("]") + .append(","); + } else { + // empty bucket although statistics cover + // minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime] + series.append("null[null],null[null],null[null],null[null],"); + } } public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { return false; - // long minT = chunkSuit4Tri.chunkMetadata.getStartTime(); - // long maxT = chunkSuit4Tri.chunkMetadata.getEndTime(); - // return minT >= curStartTime && maxT < curEndTime; + // long TP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getTopTimestamp(); + // long BP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getBottomTimestamp(); + // return TP_t >= curStartTime && TP_t < curEndTime && BP_t >= curStartTime && BP_t < + // curEndTime; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4_deprecated.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4_deprecated.java new file mode 100644 index 00000000000..0ede99042a3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_M4_deprecated.java @@ -0,0 +1,446 @@ +/// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ +// +// package org.apache.iotdb.db.query.dataset.groupby; +// +// import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +// import org.apache.iotdb.db.exception.StorageEngineException; +// import org.apache.iotdb.db.exception.query.QueryProcessException; +// import org.apache.iotdb.db.metadata.PartialPath; +// import org.apache.iotdb.db.query.aggregation.AggregateResult; +// import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult; +// import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; +// import org.apache.iotdb.db.query.context.QueryContext; +// import org.apache.iotdb.db.query.control.QueryResourceManager; +// import org.apache.iotdb.db.query.filter.TsFileFilter; +// import org.apache.iotdb.db.query.reader.series.SeriesReader; +// import org.apache.iotdb.db.utils.FileLoaderUtils; +// import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +// import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +// import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +// import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; +// import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; +// import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; +// import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; +// import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +// import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +// import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +// import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +// import org.apache.iotdb.tsfile.read.filter.basic.Filter; +// import org.apache.iotdb.tsfile.read.reader.page.PageReader; +// import org.apache.iotdb.tsfile.utils.Pair; +// +// import org.slf4j.Logger; +// import org.slf4j.LoggerFactory; +// +// import java.io.IOException; +// import java.nio.ByteBuffer; +// import java.util.ArrayList; +// import java.util.Comparator; +// import java.util.List; +// import java.util.ListIterator; +// import java.util.Set; +// +// public class LocalGroupByExecutorTri_M4_deprecated implements GroupByExecutor { +// +// private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); +// +// // Aggregate result buffer of this path +// private final List<AggregateResult> results = new ArrayList<>(); +// +// private List<ChunkSuit4Tri> currentChunkList; +// private final List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); +// +// private Filter timeFilter; +// +// public LocalGroupByExecutorTri_M4_deprecated( +// PartialPath path, +// Set<String> allSensors, +// TSDataType dataType, +// QueryContext context, +// Filter timeFilter, +// TsFileFilter fileFilter, +// boolean ascending) +// throws StorageEngineException, QueryProcessException { +// // long start = System.nanoTime(); +// +// // get all data sources +// QueryDataSource queryDataSource = +// QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); +// +// // update filter by TTL +// this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); +// +// SeriesReader seriesReader = +// new SeriesReader( +// path, +// allSensors, +// // fix bug: here use the aggregation type as the series data type, +// // not using pageReader.getAllSatisfiedPageData is ok +// dataType, +// context, +// queryDataSource, +// timeFilter, +// null, +// fileFilter, +// ascending); +// +// // unpackAllOverlappedFilesToTimeSeriesMetadata +// try { +// // : this might be bad to load all chunk metadata at first +// futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); +// // order futureChunkList by chunk startTime +// futureChunkList.sort( +// new Comparator<ChunkSuit4Tri>() { +// public int compare(ChunkSuit4Tri o1, ChunkSuit4Tri o2) { +// return ((Comparable) (o1.chunkMetadata.getStartTime())) +// .compareTo(o2.chunkMetadata.getStartTime()); +// } +// }); +// +// if (M4_CHUNK_METADATA.isDebugEnabled()) { +// if (timeFilter instanceof GroupByFilter) { +// M4_CHUNK_METADATA.debug( +// "M4_QUERY_PARAM,{},{},{}", +// ((GroupByFilter) timeFilter).getStartTime(), +// ((GroupByFilter) timeFilter).getEndTime(), +// ((GroupByFilter) timeFilter).getInterval()); +// } +// for (ChunkSuit4Tri ChunkSuit4Tri : futureChunkList) { +// Statistics statistics = ChunkSuit4Tri.chunkMetadata.getStatistics(); +// long FP_t = statistics.getStartTime(); +// long LP_t = statistics.getEndTime(); +// long BP_t = statistics.getBottomTimestamp(); +// long TP_t = statistics.getTopTimestamp(); +// switch (statistics.getType()) { +// case INT32: +// int FP_v_int = ((IntegerStatistics) statistics).getFirstValue(); +// int LP_v_int = ((IntegerStatistics) statistics).getLastValue(); +// int BP_v_int = ((IntegerStatistics) statistics).getMinValue(); +// int TP_v_int = ((IntegerStatistics) statistics).getMaxValue(); +// M4_CHUNK_METADATA.debug( +// "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", +// FP_t, +// LP_t, +// BP_t, +// TP_t, +// FP_v_int, +// LP_v_int, +// BP_v_int, +// TP_v_int, +// ChunkSuit4Tri.chunkMetadata.getVersion(), +// ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), +// statistics.getCount()); +// break; +// case INT64: +// long FP_v_long = ((LongStatistics) statistics).getFirstValue(); +// long LP_v_long = ((LongStatistics) statistics).getLastValue(); +// long BP_v_long = ((LongStatistics) statistics).getMinValue(); +// long TP_v_long = ((LongStatistics) statistics).getMaxValue(); +// M4_CHUNK_METADATA.debug( +// "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", +// FP_t, +// LP_t, +// BP_t, +// TP_t, +// FP_v_long, +// LP_v_long, +// BP_v_long, +// TP_v_long, +// ChunkSuit4Tri.chunkMetadata.getVersion(), +// ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), +// statistics.getCount()); +// break; +// case FLOAT: +// float FP_v_float = ((FloatStatistics) statistics).getFirstValue(); +// float LP_v_float = ((FloatStatistics) statistics).getLastValue(); +// float BP_v_float = ((FloatStatistics) statistics).getMinValue(); +// float TP_v_float = ((FloatStatistics) statistics).getMaxValue(); +// M4_CHUNK_METADATA.debug( +// "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", +// FP_t, +// LP_t, +// BP_t, +// TP_t, +// FP_v_float, +// LP_v_float, +// BP_v_float, +// TP_v_float, +// ChunkSuit4Tri.chunkMetadata.getVersion(), +// ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), +// statistics.getCount()); +// break; +// case DOUBLE: +// double FP_v_double = ((DoubleStatistics) statistics).getFirstValue(); +// double LP_v_double = ((DoubleStatistics) statistics).getLastValue(); +// double BP_v_double = ((DoubleStatistics) statistics).getMinValue(); +// double TP_v_double = ((DoubleStatistics) statistics).getMaxValue(); +// M4_CHUNK_METADATA.debug( +// "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", +// FP_t, +// LP_t, +// BP_t, +// TP_t, +// FP_v_double, +// LP_v_double, +// BP_v_double, +// TP_v_double, +// ChunkSuit4Tri.chunkMetadata.getVersion(), +// ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), +// statistics.getCount()); +// break; +// default: +// throw new QueryProcessException("unsupported data type!"); +// } +// } +// } +// +// } catch (IOException e) { +// throw new QueryProcessException(e.getMessage()); +// } +// +// // IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, System.nanoTime() - +// // start); +// } +// +// @Override +// public void addAggregateResult(AggregateResult aggrResult) { +// results.add(aggrResult); +// } +// +// private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime) { +// // IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN; +// +// // empty currentChunkList +// currentChunkList = new ArrayList<>(); +// +// // iterate futureChunkList +// ListIterator<ChunkSuit4Tri> itr = futureChunkList.listIterator(); +// while (itr.hasNext()) { +// ChunkSuit4Tri chunkSuit4Tri = itr.next(); +// ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; +// long chunkMinTime = chunkMetadata.getStartTime(); +// long chunkMaxTime = chunkMetadata.getEndTime(); +// if (chunkMaxTime < curStartTime) { +// // the chunk falls on the left side of the current M4 interval Ii +// itr.remove(); +// } else if (chunkMinTime >= curEndTime) { +// // the chunk falls on the right side of the current M4 interval Ii, +// // and since futureChunkList is ordered by the startTime of chunkMetadata, +// // the loop can be terminated early. +// break; +// } else if (chunkMaxTime < curEndTime) { +// // this chunk is not related to buckets later +// currentChunkList.add(chunkSuit4Tri); +// itr.remove(); +// } else { +// // this chunk is overlapped with the right border of the current bucket +// currentChunkList.add(chunkSuit4Tri); +// // still keep it in the futureChunkList +// } +// } +// } +// +// @Override +// public List<AggregateResult> calcResult( +// long curStartTime, long curEndTime, long startTime, long endTime, long interval) +// throws IOException { +// // clear result cache +// for (AggregateResult result : results) { +// result.reset(); +// } +// +// // long start = System.nanoTime(); +// getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime); +// // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); +// +// if (currentChunkList.size() == 0) { +// return results; +// } +// +// // start = System.nanoTime(); +// calculateM4(currentChunkList, curStartTime, curEndTime); +// // IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - start); +// +// return results; +// } +// +// private void calculateM4(List<ChunkSuit4Tri> currentChunkList, long curStartTime, long +// curEndTime) +// throws IOException { +// for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) { +// +// Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics(); +// +// if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) { +// // min_value(s0), max_value(s0),min_time(s0), max_time(s0), first_value(s0),last_value(s0) +// // update min_time +// results +// .get(2) +// .updateResultUsingValues( +// new long[] {chunkSuit4Tri.chunkMetadata.getStartTime()}, +// 1, +// new Object[] {statistics.getFirstValue()}); +// // update first_value +// results +// .get(4) +// .updateResultUsingValues( +// new long[] {chunkSuit4Tri.chunkMetadata.getStartTime()}, +// 1, +// new Object[] {statistics.getFirstValue()}); +// // update max_time +// results +// .get(3) +// .updateResultUsingValues( +// new long[] {chunkSuit4Tri.chunkMetadata.getEndTime()}, +// 1, +// new Object[] {statistics.getLastValue()}); +// // update last_value +// results +// .get(5) +// .updateResultUsingValues( +// new long[] {chunkSuit4Tri.chunkMetadata.getEndTime()}, +// 1, +// new Object[] {statistics.getLastValue()}); +// // update BP +// MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); +// minValueAggrResult.updateResult( +// new MinMaxInfo<>(statistics.getMinValue(), statistics.getBottomTimestamp())); +// // update TP +// MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); +// maxValueAggrResult.updateResult( +// new MinMaxInfo<>(statistics.getMaxValue(), statistics.getTopTimestamp())); +// } else { // cannot use statistics directly +// +// double minVal = Double.MAX_VALUE; +// long bottomTime = -1; +// double maxVal = -Double.MAX_VALUE; // Double.MIN_VALUE is positive so do not use it!!! +// long topTime = -1; +// long firstTime = -1; +// double firstValue = 0; +// long lastTime = -1; +// double lastValue = 0; +// +// // 1. load page data if it hasn't been loaded +// TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); +// if (dataType != TSDataType.DOUBLE) { +// throw new UnSupportedDataTypeException(String.valueOf(dataType)); +// } +// if (chunkSuit4Tri.pageReader == null) { +// chunkSuit4Tri.pageReader = +// FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, +// this.timeFilter); +// // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, +// // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. +// // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE +// // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS +// // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! +// } +// +// int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); +// PageReader pageReader = chunkSuit4Tri.pageReader; +// int i; +// for (i = chunkSuit4Tri.lastReadPos; i < count; i++) { +// long timestamp = pageReader.timeBuffer.getLong(i * 8); +// if (timestamp < curStartTime) { +// // 2. read from lastReadPos until the first point fallen within this bucket (if it +// // exists) +// continue; +// } else if (timestamp >= curEndTime) { +// // 3. traverse until the first point fallen right this bucket, also remember to update +// // lastReadPos +// chunkSuit4Tri.lastReadPos = i; +// break; +// } else { +// // 4. update MinMax by traversing points fallen within this bucket +// ByteBuffer valueBuffer = pageReader.valueBuffer; +// double v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); +// if (firstTime < 0) { +// firstTime = timestamp; +// firstValue = v; +// } +// lastTime = timestamp; +// lastValue = v; +// if (v < minVal) { +// minVal = v; +// bottomTime = timestamp; +// } +// if (v > maxVal) { +// maxVal = v; +// topTime = timestamp; +// } +// } +// } +// // clear for heap space +// if (i >= count) { +// // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page +// // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 +// chunkSuit4Tri.pageReader = null; +// } +// // 4. update MinMax by traversing points fallen within this bucket +// if (topTime >= 0) { +// // min_value(s0), max_value(s0),min_time(s0), max_time(s0), +// first_value(s0),last_value(s0) +// // update min_time +// results +// .get(2) +// .updateResultUsingValues(new long[] {firstTime}, 1, new Object[] {firstValue}); +// // update first_value +// results +// .get(4) +// .updateResultUsingValues(new long[] {firstTime}, 1, new Object[] {firstValue}); +// // update max_time +// results +// .get(3) +// .updateResultUsingValues(new long[] {lastTime}, 1, new Object[] {lastValue}); +// // update last_value +// results +// .get(5) +// .updateResultUsingValues(new long[] {lastTime}, 1, new Object[] {lastValue}); +// // update BP +// MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); +// minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, bottomTime)); +// // update TP +// MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); +// maxValueAggrResult.updateResult(new MinMaxInfo<>(maxVal, topTime)); +// } +// } +// } +// } +// +// public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) +// { +// return false; +// // long minT = chunkSuit4Tri.chunkMetadata.getStartTime(); +// // long maxT = chunkSuit4Tri.chunkMetadata.getEndTime(); +// // return minT >= curStartTime && maxT < curEndTime; +// } +// +// @Override +// public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) +// throws IOException { +// throw new IOException("no implemented"); +// } +// +// @Override +// public List<AggregateResult> calcResult(long curStartTime, long curEndTime) +// throws IOException, QueryProcessException { +// throw new IOException("no implemented"); +// } +// } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java index abf98948ec6..6020b07a7a6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java @@ -19,12 +19,13 @@ package org.apache.iotdb.db.query.dataset.groupby; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.query.aggregation.AggregateResult; -import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult; import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; @@ -41,6 +42,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; import org.apache.iotdb.tsfile.read.filter.GroupByFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.page.PageReader; @@ -59,6 +61,8 @@ import java.util.Set; public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); // Aggregate result buffer of this path @@ -69,6 +73,8 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { private Filter timeFilter; + private final int N1; // 分桶数 + public LocalGroupByExecutorTri_MinMax( PartialPath path, Set<String> allSensors, @@ -101,6 +107,12 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { fileFilter, ascending); + GroupByFilter groupByFilter = (GroupByFilter) timeFilter; + long startTime = groupByFilter.getStartTime(); + long endTime = groupByFilter.getEndTime(); + long interval = groupByFilter.getInterval(); + N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数 + // unpackAllOverlappedFilesToTimeSeriesMetadata try { // : this might be bad to load all chunk metadata at first @@ -261,49 +273,84 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { public List<AggregateResult> calcResult( long curStartTime, long curEndTime, long startTime, long endTime, long interval) throws IOException { + // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, + // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) + // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket + StringBuilder series = new StringBuilder(); + // clear result cache for (AggregateResult result : results) { result.reset(); } - // long start = System.nanoTime(); - getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime); - // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); + // 全局首点(对于MinMax来说全局首尾点只是输出不会影响到其它桶的采点) + series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); + + // Assume no empty buckets + for (int b = 0; b < N1; b++) { + long localCurStartTime = startTime + (b) * interval; + long localCurEndTime = startTime + (b + 1) * interval; + + getCurrentChunkListFromFutureChunkList(localCurStartTime, localCurEndTime); + + if (currentChunkList.size() == 0) { + // System.out.println("MinMax empty currentChunkList"); // TODO debug + series + .append("null") + .append("[") + .append("null") + .append("]") + .append(",") + .append("null") + .append("[") + .append("null") + .append("]") + .append(","); + continue; + } - if (currentChunkList.size() == 0) { - System.out.println("MinMax empty currentChunkList"); // TODO debug - return results; + calculateMinMax(currentChunkList, localCurStartTime, localCurEndTime, series); } - // start = System.nanoTime(); - calculateMinMax(currentChunkList, curStartTime, curEndTime); - // IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - start); + // 全局尾点 + series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); + + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0)); return results; } private void calculateMinMax( - List<ChunkSuit4Tri> currentChunkList, long curStartTime, long curEndTime) throws IOException { + List<ChunkSuit4Tri> currentChunkList, + long curStartTime, + long curEndTime, + StringBuilder series) + throws IOException { + double minValue = Double.MAX_VALUE; + long bottomTime = -1; + double maxValue = -Double.MAX_VALUE; // Double.MIN_VALUE is positive so do not use it!!! + long topTime = -1; + for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) { Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics(); if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) { // update BP - MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); - minValueAggrResult.updateResult( - new MinMaxInfo<>(statistics.getMinValue(), statistics.getBottomTimestamp())); + double chunkMinValue = (double) statistics.getMinValue(); + if (chunkMinValue < minValue) { + minValue = chunkMinValue; + bottomTime = statistics.getBottomTimestamp(); + } // update TP - MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); - maxValueAggrResult.updateResult( - new MinMaxInfo<>(statistics.getMaxValue(), statistics.getTopTimestamp())); + double chunkMaxValue = (double) statistics.getMaxValue(); + if (chunkMaxValue > maxValue) { + maxValue = chunkMaxValue; + topTime = statistics.getTopTimestamp(); + } } else { // cannot use statistics directly - double minVal = Double.MAX_VALUE; - long bottomTime = -1; - double maxVal = -Double.MAX_VALUE; // Double.MIN_VALUE is positive so do not use it!!! - long topTime = -1; - // 1. load page data if it hasn't been loaded TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); if (dataType != TSDataType.DOUBLE) { @@ -323,6 +370,7 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { PageReader pageReader = chunkSuit4Tri.pageReader; int i; for (i = chunkSuit4Tri.lastReadPos; i < count; i++) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; long timestamp = pageReader.timeBuffer.getLong(i * 8); if (timestamp < curStartTime) { // 2. read from lastReadPos until the first point fallen within this bucket (if it @@ -337,33 +385,51 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { // 4. update MinMax by traversing points fallen within this bucket ByteBuffer valueBuffer = pageReader.valueBuffer; double v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); - if (v < minVal) { - minVal = v; + if (v < minValue) { + minValue = v; bottomTime = timestamp; } - if (v > maxVal) { - maxVal = v; + if (v > maxValue) { + maxValue = v; topTime = timestamp; } } } - // clear for heap space - if (i >= count) { - // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page - // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 - chunkSuit4Tri.pageReader = null; - } - // 4. update MinMax by traversing points fallen within this bucket - if (topTime >= 0) { - // update BP - MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); - minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, bottomTime)); - // update TP - MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); - maxValueAggrResult.updateResult(new MinMaxInfo<>(maxVal, topTime)); - } + // // clear for heap space + // if (i >= count) { + // // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page + // // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap space不够 + // chunkSuit4Tri.pageReader = null; + // } } } + // 记录结果 + if (topTime >= 0) { + series + .append(minValue) + .append("[") + .append(bottomTime) + .append("]") + .append(",") + .append(maxValue) + .append("[") + .append(topTime) + .append("]") + .append(","); + } else { + // empty bucket although statistics cover + series + .append("null") + .append("[") + .append("null") + .append("]") + .append(",") + .append("null") + .append("[") + .append("null") + .append("]") + .append(","); + } } public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMaxPreselection.java similarity index 97% copy from server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java copy to server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMaxPreselection.java index abf98948ec6..555e775fd67 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMaxPreselection.java @@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; import org.apache.iotdb.tsfile.read.filter.GroupByFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.page.PageReader; @@ -57,7 +58,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Set; -public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { +public class LocalGroupByExecutorTri_MinMaxPreselection implements GroupByExecutor { private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); @@ -69,7 +70,7 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { private Filter timeFilter; - public LocalGroupByExecutorTri_MinMax( + public LocalGroupByExecutorTri_MinMaxPreselection( PartialPath path, Set<String> allSensors, TSDataType dataType, @@ -271,7 +272,7 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); if (currentChunkList.size() == 0) { - System.out.println("MinMax empty currentChunkList"); // TODO debug + // System.out.println("MinMax empty currentChunkList"); // TODO debug return results; } @@ -323,6 +324,7 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { PageReader pageReader = chunkSuit4Tri.pageReader; int i; for (i = chunkSuit4Tri.lastReadPos; i < count; i++) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; long timestamp = pageReader.timeBuffer.getLong(i * 8); if (timestamp < curStartTime) { // 2. read from lastReadPos until the first point fallen within this bucket (if it diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_M4.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_M4.java index 0c30926deaa..9fe7855351d 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_M4.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_M4.java @@ -109,7 +109,7 @@ public class MyTest_M4 { prepareData1(); String res = "0.0[0],1.0[20],15.0[2],5.0[1],1.0[20],8.0[25],8.0[25],8.0[25],8.0[25],3.0[54]," - + "3.0[54],3.0[54],3.0[54],null,null,null[null],null[null],200.0[200],"; + + "3.0[54],3.0[54],3.0[54],null[null],null[null],null[null],null[null],200.0[200],"; try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { @@ -238,7 +238,7 @@ public class MyTest_M4 { prepareData3_2(); String res = - "0.0[0],1.0[10],10.0[2],5.0[1],5.0[20],null,null,null[null],null[null],4.0[72]," + "0.0[0],1.0[10],10.0[2],5.0[1],5.0[20],null[null],null[null],null[null],null[null],4.0[72]," + "20.0[62],15.0[60],4.0[72],1.0[90],11.0[80],11.0[80],1.0[90],200.0[200],"; try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java index a6f76b440ed..b6bcc0c6a0e 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java @@ -103,7 +103,8 @@ public class MyTest_MinMax { @Test public void test1() throws Exception { prepareData1(); - String res = "0.0[0],1.0[20],15.0[2],8.0[25],8.0[25],3.0[54],3.0[54],null,null,200.0[200],"; + String res = + "0.0[0],1.0[20],15.0[2],8.0[25],8.0[25],3.0[54],3.0[54],null[null],null[null],200.0[200],"; // 0,P1v[P1t],BPv[t]ofBucket1,TPv[t]ofBucket1,BPv[t]ofBucket2,TPv[t]ofBucket2,...,Pnv[Pnt] try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); @@ -232,7 +233,8 @@ public class MyTest_MinMax { public void test3_2() { prepareData3_2(); - String res = "0.0[0],1.0[10],10.0[2],null,null,4.0[72],20.0[62],1.0[90],11.0[80],200.0[200],"; + String res = + "0.0[0],1.0[10],10.0[2],null[null],null[null],4.0[72],20.0[62],1.0[90],11.0[80],200.0[200],"; try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) {
