This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5b4209916ce0495c2b5650c00e8e8713a4702fc1 Author: Lei Rui <[email protected]> AuthorDate: Sun Jan 28 15:23:46 2024 +0800 add iteration --- .../groupby/GroupByWithoutValueFilterDataSet.java | 12 +- .../groupby/LocalGroupByExecutorTri_ILTS.java | 316 +++++++++++++++++++++ .../groupby/LocalGroupByExecutorTri_LTTB.java | 151 +--------- .../iotdb/db/integration/tri/MyTest_ILTS.java | 248 ++++++++++++++++ 4 files changed, 569 insertions(+), 158 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 913158a5021..c349d9dc3c9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -142,14 +142,9 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } // } else if (CONFIG.getEnableTri().equals("M4LTTB")) { // // TODO - else if (CONFIG.getEnableTri().equals("LTTB")) { - // TODO + else if (CONFIG.getEnableTri().equals("LTTB") || CONFIG.getEnableTri().equals("ILTS")) { return nextWithoutConstraintTri_LTTB(); - } - // } else if (CONFIG.getEnableTri().equals("ILTS")) { - // // TODO - // } - else { + } else { return nextWithoutConstraint_raw(); } } @@ -494,12 +489,11 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { return new LocalGroupByExecutor( path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } else if (CONFIG.getEnableTri().equals("LTTB")) { - // TODO return new LocalGroupByExecutorTri_LTTB( path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } else if (CONFIG.getEnableTri().equals("ILTS")) { // TODO - return new LocalGroupByExecutor( + return new LocalGroupByExecutorTri_ILTS( path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } // deprecated below diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java new file mode 100644 index 00000000000..73f9fd395d7 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset.groupby; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.SeriesReader; +import org.apache.iotdb.db.utils.FileLoaderUtils; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { + + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); + + // Aggregate result buffer of this path + private final List<AggregateResult> results = new ArrayList<>(); + + // private List<ChunkSuit4Tri> currentChunkList; + + // private final List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); + + // keys: 0,1,...,(int) Math.floor((endTime * 1.0 - startTime) / interval)-1 + private final Map<Integer, List<ChunkSuit4Tri>> splitChunkList = new HashMap<>(); + + private final long p1t = CONFIG.getP1t(); + private final double p1v = CONFIG.getP1v(); + private final long pnt = CONFIG.getPnt(); + private final double pnv = CONFIG.getPnv(); + + private long lt = p1t; + private double lv = p1v; + + private final int N1; // 分桶数 + + private static final int numIterations = 8; + + private Filter timeFilter; + + public LocalGroupByExecutorTri_ILTS( + PartialPath path, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + Filter timeFilter, + TsFileFilter fileFilter, + boolean ascending) + throws StorageEngineException, QueryProcessException { + // long start = System.nanoTime(); + + // get all data sources + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); + + // update filter by TTL + this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + SeriesReader seriesReader = + new SeriesReader( + path, + allSensors, + // fix bug: here use the aggregation type as the series data type, + // not using pageReader.getAllSatisfiedPageData is ok + dataType, + context, + queryDataSource, + timeFilter, + null, + fileFilter, + ascending); + + // unpackAllOverlappedFilesToTimeSeriesMetadata + try { + // : this might be bad to load all chunk metadata at first + List<ChunkSuit4Tri> futureChunkList = + new ArrayList<>(seriesReader.getAllChunkMetadatas4Tri()); // no need sort here + // arrange futureChunkList into each bucket + GroupByFilter groupByFilter = (GroupByFilter) timeFilter; + long startTime = groupByFilter.getStartTime(); + long endTime = groupByFilter.getEndTime(); + long interval = groupByFilter.getInterval(); + N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数 + for (ChunkSuit4Tri chunkSuit4Tri : futureChunkList) { + ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; + long chunkMinTime = chunkMetadata.getStartTime(); + long chunkMaxTime = chunkMetadata.getEndTime(); + int idx1 = (int) Math.floor((chunkMinTime - startTime) * 1.0 / interval); + int idx2 = (int) Math.floor((chunkMaxTime - startTime) * 1.0 / interval); + idx2 = (int) Math.min(idx2, N1 - 1); + for (int i = idx1; i <= idx2; i++) { + splitChunkList.computeIfAbsent(i, k -> new ArrayList<>()); + splitChunkList.get(i).add(chunkSuit4Tri); + } + } + + } catch (IOException e) { + throw new QueryProcessException(e.getMessage()); + } + + // IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, System.nanoTime() - + // start); + } + + @Override + public void addAggregateResult(AggregateResult aggrResult) { + results.add(aggrResult); + } + + @Override + public List<AggregateResult> calcResult( + long curStartTime, long curEndTime, long startTime, long endTime, long interval) + throws IOException { + // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, + // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) + // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket + StringBuilder series_final = new StringBuilder(); + + // clear result cache + for (AggregateResult result : results) { + result.reset(); + } + + long[] lastIter_t = new long[N1]; // N1不包括全局首尾点 + double[] lastIter_v = new double[N1]; // N1不包括全局首尾点 + for (int num = 0; num < numIterations; num++) { + StringBuilder series = new StringBuilder(); + // 全局首点 + series.append(p1v).append("[").append(p1t).append("]").append(","); + // 遍历分桶 Assume no empty buckets + for (int b = 0; b < N1; b++) { + long rt = 0; // must initialize as zero, because may be used as sum for average + double rv = 0; // must initialize as zero, because may be used as sum for average + // 计算右边桶的固定点 + if (b == N1 - 1) { // 最后一个桶 + // 全局尾点 + rt = pnt; + rv = pnv; + } else { // 不是最后一个桶 + if (num == 0) { // 是第一次迭代的话,就使用右边桶的平均点 + // ========计算右边桶的平均点======== + List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b + 1); + long rightStartTime = startTime + (b + 1) * interval; + long rightEndTime = startTime + (b + 2) * interval; + int cnt = 0; + // 遍历所有与右边桶overlap的chunks + for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { + TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); + if (dataType != TSDataType.DOUBLE) { + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + // 1. load page data if it hasn't been loaded + if (chunkSuit4Tri.pageReader == null) { + chunkSuit4Tri.pageReader = + FileLoaderUtils.loadPageReaderList4CPV( + chunkSuit4Tri.chunkMetadata, this.timeFilter); + // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! + } + // 2. 计算平均点 + PageReader pageReader = chunkSuit4Tri.pageReader; + for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { + long timestamp = pageReader.timeBuffer.getLong(j * 8); + if (timestamp < rightStartTime) { + continue; + } else if (timestamp >= rightEndTime) { + break; + } else { // rightStartTime<=t<rightEndTime + ByteBuffer valueBuffer = pageReader.valueBuffer; + double v = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); + rt += timestamp; + rv += v; + cnt++; + } + } + } + rt = rt / cnt; + rv = rv / cnt; + } else { // 不是第一次迭代也不是最后一个桶的话,就使用上一轮迭代右边桶的采样点 + rt = lastIter_t[b + 1]; + rv = lastIter_v[b + 1]; + } + } + // ========找到当前桶内距离lr连线最远的点======== + double maxArea = -1; + long select_t = -1; + double select_v = -1; + List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b); + long localCurStartTime = startTime + (b) * interval; + long localCurEndTime = startTime + (b + 1) * interval; + // 遍历所有与当前桶overlap的chunks + for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { + TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); + if (dataType != TSDataType.DOUBLE) { + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + // load page data if it hasn't been loaded + if (chunkSuit4Tri.pageReader == null) { + chunkSuit4Tri.pageReader = + FileLoaderUtils.loadPageReaderList4CPV( + chunkSuit4Tri.chunkMetadata, this.timeFilter); + // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! + } + PageReader pageReader = chunkSuit4Tri.pageReader; + for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { + long timestamp = pageReader.timeBuffer.getLong(j * 8); + if (timestamp < localCurStartTime) { + continue; + } else if (timestamp >= localCurEndTime) { + break; + } else { // localCurStartTime<=t<localCurEndTime + ByteBuffer valueBuffer = pageReader.valueBuffer; + double v = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); + double area = IOMonitor2.calculateTri(lt, lv, timestamp, v, rt, rv); + if (area > maxArea) { + maxArea = area; + select_t = timestamp; + select_v = v; + } + } + } + } + // 记录结果 + series.append(select_v).append("[").append(select_t).append("]").append(","); + + // 更新lt,lv + // 下一个桶自然地以select_t, select_v作为左桶固定点 + lt = select_t; + lv = select_v; + // 记录本轮迭代本桶选点 + lastIter_t[b] = select_t; + lastIter_v[b] = select_v; + } // 遍历分桶结束 + // 全局尾点 + series.append(pnv).append("[").append(pnt).append("]").append(","); + + System.out.println(series); + } // end Iterations + + // 全局首点 + series_final.append(p1v).append("[").append(p1t).append("]").append(","); + for (int i = 0; i < lastIter_t.length; i++) { + series_final.append(lastIter_v[i]).append("[").append(lastIter_t[i]).append("]").append(","); + } + // 全局尾点 + series_final.append(pnv).append("[").append(pnt).append("]").append(","); + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(series_final.toString(), 0)); + + return results; + } + + @Override + public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) + throws IOException { + throw new IOException("no implemented"); + } + + @Override + public List<AggregateResult> calcResult(long curStartTime, long curEndTime) + throws IOException, QueryProcessException { + throw new IOException("no implemented"); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java index 9ec058e2059..be84be70c38 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java @@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.query.aggregation.AggregateResult; -import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult; import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; @@ -37,7 +36,6 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; import org.apache.iotdb.tsfile.read.common.IOMonitor2; import org.apache.iotdb.tsfile.read.filter.GroupByFilter; @@ -78,7 +76,6 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { private long lt = p1t; private double lv = p1v; - // private boolean[] notEmptyBucket; private final int N1; // 分桶数 @@ -119,8 +116,8 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { // unpackAllOverlappedFilesToTimeSeriesMetadata try { // : this might be bad to load all chunk metadata at first - List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); - futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); // no need sort here + List<ChunkSuit4Tri> futureChunkList = + new ArrayList<>(seriesReader.getAllChunkMetadatas4Tri()); // no need sort here // arrange futureChunkList into each bucket GroupByFilter groupByFilter = (GroupByFilter) timeFilter; long startTime = groupByFilter.getStartTime(); @@ -153,39 +150,6 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { results.add(aggrResult); } - // private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime) { - // // IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN; - // - // // empty currentChunkList - // currentChunkList = new ArrayList<>(); - // - // // iterate futureChunkList - // ListIterator<ChunkSuit4Tri> itr = futureChunkList.listIterator(); - // while (itr.hasNext()) { - // ChunkSuit4Tri chunkSuit4Tri = itr.next(); - // ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; - // long chunkMinTime = chunkMetadata.getStartTime(); - // long chunkMaxTime = chunkMetadata.getEndTime(); - // if (chunkMaxTime < curStartTime) { - // // the chunk falls on the left side of the current M4 interval Ii - // itr.remove(); - // } else if (chunkMinTime >= curEndTime) { - // // the chunk falls on the right side of the current M4 interval Ii, - // // and since futureChunkList is ordered by the startTime of chunkMetadata, - // // the loop can be terminated early. - // break; - // } else if (chunkMaxTime < curEndTime) { - // // this chunk is not related to buckets later - // currentChunkList.add(chunkSuit4Tri); - // itr.remove(); - // } else { - // // this chunk is overlapped with the right border of the current bucket - // currentChunkList.add(chunkSuit4Tri); - // // still keep it in the futureChunkList - // } - // } - // } - @Override public List<AggregateResult> calcResult( long curStartTime, long curEndTime, long startTime, long endTime, long interval) @@ -313,120 +277,9 @@ public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0)); - // int idx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval); - // System.out.println("idx=" + idx); - // List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(idx); - // if (chunkSuit4TriList != null) { - // for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { - // System.out.println(chunkSuit4Tri.chunkMetadata.getStartTime()); - // } - // } - - // long start = System.nanoTime(); - // getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime); - // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); - - // if (currentChunkList.size() == 0) { - // return results; - // } - - // start = System.nanoTime(); - // calculateMinMax(currentChunkList, curStartTime, curEndTime); - // IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - start); - return results; } - private void calculateMinMax( - List<ChunkSuit4Tri> currentChunkList, long curStartTime, long curEndTime) throws IOException { - for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) { - - Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics(); - - if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) { - // update BP - MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); - minValueAggrResult.updateResult( - new MinMaxInfo<>(statistics.getMinValue(), statistics.getBottomTimestamp())); - // update TP - MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); - maxValueAggrResult.updateResult( - new MinMaxInfo<>(statistics.getMaxValue(), statistics.getTopTimestamp())); - } else { // cannot use statistics directly - - Comparable<Object> minVal = null; - long bottomTime = -1; - Comparable<Object> maxVal = null; - long topTime = -1; - - // 1. load page data if it hasn't been loaded - if (chunkSuit4Tri.pageReader == null) { - chunkSuit4Tri.pageReader = - FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, this.timeFilter); - // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, - // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. - // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS - // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! - } - - int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); - PageReader pageReader = chunkSuit4Tri.pageReader; - for (int i = chunkSuit4Tri.lastReadPos; i < count; i++) { - long timestamp = pageReader.timeBuffer.getLong(i * 8); - if (timestamp < curStartTime) { - // 2. read from lastReadPos until the first point fallen within this bucket (if it - // exists) - continue; - } else if (timestamp >= curEndTime) { - // 3. traverse until the first point fallen right this bucket, also remember to update - // lastReadPos - chunkSuit4Tri.lastReadPos = i; - break; - } else { - // 4. update MinMax by traversing points fallen within this bucket - ByteBuffer valueBuffer = pageReader.valueBuffer; - TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); - Object v; - switch (dataType) { - // case INT64: - // v = valueBuffer.getLong(pageReader.timeBufferLength + i * 8); - // break; - case DOUBLE: - v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); - } - if (minVal == null || minVal.compareTo(v) > 0) { - minVal = (Comparable<Object>) v; - bottomTime = timestamp; - } - if (maxVal == null || maxVal.compareTo(v) < 0) { - maxVal = (Comparable<Object>) v; - topTime = timestamp; - } - } - } - // 4. update MinMax by traversing points fallen within this bucket - if (minVal != null) { - // update BP - MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); - minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, bottomTime)); - // update TP - MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); - maxValueAggrResult.updateResult(new MinMaxInfo<>(maxVal, topTime)); - } - } - } - } - - public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { - long TP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getTopTimestamp(); - long BP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getBottomTimestamp(); - return TP_t >= curStartTime && TP_t < curEndTime && BP_t >= curStartTime && BP_t < curEndTime; - } - @Override public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) throws IOException { diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java new file mode 100644 index 00000000000..d2381e6fa75 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.integration.tri; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Locale; + +import static org.junit.Assert.fail; + +public class MyTest_ILTS { + + /* + * Sql format: SELECT min_value(s0), max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). + * Requirements: + * (1) Don't change the sequence of the above two aggregates + * (2) Assume each chunk has only one page. + * (3) Assume all chunks are sequential and no deletes. + * (4) Assume plain encoding, UNCOMPRESSED, Long or Double data type, no compaction + * (5) Assume no empty bucket + */ + private static final String TIMESTAMP_STR = "Time"; + + private static String[] creationSqls = + new String[] { + "SET STORAGE GROUP TO root.vehicle.d0", + "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=DOUBLE, ENCODING=PLAIN", + // IoTDB int data type does not support plain encoding, so use long data type + }; + + private final String d0s0 = "root.vehicle.d0.s0"; + + private static final String insertTemplate = + "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)"; + + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + @Before + public void setUp() throws Exception { + TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN"); + config.setTimestampPrecision("ms"); + config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); + + config.setEnableTri("ILTS"); + + config.setEnableCPV(false); + TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(false); + TSFileDescriptor.getInstance().getConfig().setUseStatistics(false); + + EnvironmentUtils.envSetUp(); + Class.forName(Config.JDBC_DRIVER_NAME); + } + + @After + public void tearDown() throws Exception { + EnvironmentUtils.cleanEnv(); + } + + @Test + public void test3_2() { + prepareData3_2(); + + String res = "5.0[1],10.0[2],2.0[40],5.0[55],20.0[62],1.0[90],7.0[102],"; + try (Connection connection = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + boolean hasResultSet = + statement.execute( + "SELECT min_value(s0)" + // TODO not real min_value here, actually controlled by enableTri + + " FROM root.vehicle.d0 group by ([2,102),20ms)"); + // (102-2)/(7-2)=20ms + // note keep no empty buckets + + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int i = 0; + while (resultSet.next()) { + String ans = resultSet.getString(2); + // for LTTB all results are in the value string of MinValueAggrResult + // 因此对于LTTB来说,MinValueAggrResult的[t]也无意义 + ans = ans.substring(0, ans.length() - 3); + System.out.println(ans); + Assert.assertEquals(res, ans); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void prepareData3_2() { + // data: + // https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png + // slightly modified + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + for (String sql : creationSqls) { + statement.execute(sql); + } + + long[] t = new long[] {1, 2, 10, 20, 22, 30, 40, 55, 60, 62, 65, 70, 72, 80, 90, 102}; + double[] v = new double[] {5, 10, 1, 5, 4, 8, 2, 5, 15, 20, 8, 18, 4, 11, 1, 7}; + config.setP1t(t[0]); + config.setP1v(v[0]); + config.setPnt(t[t.length - 1]); + config.setPnv(v[v.length - 1]); + + for (int i = 0; i < t.length; i++) { + statement.execute(String.format(Locale.ENGLISH, insertTemplate, t[i], v[i])); + if ((i + 1) % 4 == 0) { + statement.execute("FLUSH"); + } + } + statement.execute("FLUSH"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void test3() { + prepareData3(); + + String res = + "-1.2079272[0],1.101946[200],-0.523204[300],0.145359[500],-1.014322[700]," + + "0.532565[900],-0.122525[1200],-0.676077[1300],0.809559[1500],0.315869[1800]," + + "-0.413534[1900],-0.0211206[2100],"; + try (Connection connection = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + boolean hasResultSet = + statement.execute( + "SELECT min_value(s0)" + // TODO not real min_value here, actually controlled by enableTri + + " FROM root.vehicle.d0 group by ([100,2100),200ms)"); + // (tn-t2)/(nout-2)=(2100-100)/(12-2)=2000/10=200 + + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int i = 0; + while (resultSet.next()) { + String ans = resultSet.getString(2); + // for LTTB all results are in the value string of MinValueAggrResult + // 因此对于LTTB来说,MinValueAggrResult的[t]也无意义 + ans = ans.substring(0, ans.length() - 3); + System.out.println(ans); + Assert.assertEquals(res, ans); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void prepareData3() { + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + for (String sql : creationSqls) { + statement.execute(sql); + } + + int[] t = + new int[] { + 0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500, + 1600, 1700, 1800, 1900, 2000, 2100 + }; + double[] v = + new double[] { + -1.2079272, + -0.01120245, + 1.1019456, + -0.52320362, + -0.35970289, + 0.1453591, + -0.45947892, + -1.0143219, + 0.81760821, + 0.5325646, + -0.29532424, + -0.1469335, + -0.12252526, + -0.67607713, + -0.16967308, + 0.8095585, + -0.78541944, + 0.03221141, + 0.31586886, + -0.41353356, + -0.21019539, + -0.0211206 + }; + config.setP1t(t[0]); + config.setP1v(v[0]); + config.setPnt(t[t.length - 1]); + config.setPnv(v[v.length - 1]); + + for (int i = 0; i < t.length; i++) { + statement.execute(String.format(Locale.ENGLISH, insertTemplate, t[i], v[i])); + if ((i + 1) % 4 == 0) { + statement.execute("FLUSH"); + } + } + statement.execute("FLUSH"); + } catch (Exception e) { + e.printStackTrace(); + } + } +}
