This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e252a6f12a29349c5c0c76d33361026210c3130e Author: Lei Rui <[email protected]> AuthorDate: Sun Jan 28 14:46:22 2024 +0800 LTTB --- .../dataset/groupby/GroupByEngineDataSet.java | 3 +- .../groupby/GroupByWithoutValueFilterDataSet.java | 57 ++- .../groupby/LocalGroupByExecutorTri_LTTB.java | 441 +++++++++++++++++++++ .../groupby/LocalGroupByExecutorTri_MinMax.java | 31 +- .../{MyTest_MinMaxLTTB.java => MyTest_LTTB.java} | 160 ++++---- .../iotdb/db/integration/tri/MyTest_MinMax.java | 92 ++--- .../db/integration/tri/MyTest_MinMaxLTTB.java | 56 --- .../iotdb/tsfile/read/common/ChunkSuit4Tri.java | 5 +- 8 files changed, 639 insertions(+), 206 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java index 8feafc55602..da8795fbbec 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java @@ -129,7 +129,8 @@ public abstract class GroupByEngineDataSet extends QueryDataSet { curStartTime += curSlidingStep; } // This is an open interval , [0-100) - if (curStartTime + interval >= endTime) { + if (curStartTime + interval > endTime) { + // 注意没有等号!因为左闭右开 // + interval to make the last bucket complete // e.g, T=11,nout=3,interval=floor(11/3)=3, // [0,3),[3,6),[6,9), no need incomplete [9,11) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 9bb07093067..913158a5021 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -142,8 +142,10 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } // } else if (CONFIG.getEnableTri().equals("M4LTTB")) { // // TODO - // } else if (CONFIG.getEnableTri().equals("LTTB")) { - // // TODO + else if (CONFIG.getEnableTri().equals("LTTB")) { + // TODO + return nextWithoutConstraintTri_LTTB(); + } // } else if (CONFIG.getEnableTri().equals("ILTS")) { // // TODO // } @@ -152,6 +154,40 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } } + public RowRecord nextWithoutConstraintTri_LTTB() throws IOException { + RowRecord record; + try { + GroupByExecutor executor = null; + for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { + executor = pathToExecutorEntry.getValue(); // assume only one series here + break; + } + + // concat results into a string + record = new RowRecord(0); + StringBuilder series = new StringBuilder(); + + // all bucket results as string in value of MinValueAggrResult + List<AggregateResult> aggregations = + executor.calcResult(startTime, startTime + interval, startTime, endTime, interval); + series.append(aggregations.get(0).getResult()); + + // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) + record.addField(series, TSDataType.MIN_MAX_INT64); + + } catch (QueryProcessException e) { + logger.error("GroupByWithoutValueFilterDataSet execute has error", e); + throw new IOException(e.getMessage(), e); + } + + // in the end, make the next hasNextWithoutConstraint() false + // as we already fetch all here + curStartTime = endTime; + hasCachedTimeInterval = false; + + return record; + } + public RowRecord nextWithoutConstraintTri_MinMaxLTTB() throws IOException { int divide = 2; // one LTTB bucket corresponds to rps/2 MinMax buckets @@ -171,7 +207,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { try { // First step: get the MinMax preselection result List<Long> times = new ArrayList<>(); - List<Object> values = new ArrayList<>(); + List<Double> values = new ArrayList<>(); GroupByExecutor executor = null; for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { executor = pathToExecutorEntry.getValue(); // assume only one series here @@ -179,7 +215,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } for (long localCurStartTime = startTime; localCurStartTime + interval <= endTime; - // 注意有等号! + // 注意有等号!因为左闭右开 // + interval to make the last bucket complete // e.g, T=11,nout=3,interval=floor(11/3)=3, // [0,3),[3,6),[6,9), no need incomplete [9,11) @@ -199,7 +235,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { values.add(null); } else { times.add(minMaxInfo.timestamp); - values.add(minMaxInfo.val); + values.add((Double) minMaxInfo.val); } } } @@ -265,7 +301,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { // 一个LTTB桶里有rps个MinMax预选点(包含重复和null) if (times.get(j) != null) { long t = times.get(j); - double v = (double) values.get(j); // TODO + double v = values.get(j); double area = IOMonitor2.calculateTri(lt, lv, t, v, rt, rv); // System.out.printf("curr=%d,t=%d,area=%f,lt=%d%n", currentBucket, t, area, // lt); @@ -279,6 +315,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { if (select_t < 0) { throw new IOException("something is wrong"); } + // 记录结果 series.append(select_v).append("[").append(select_t).append("]").append(","); // 现在更新当前桶和左边固定点,并且把结果点加到series里 @@ -296,7 +333,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { // 一个LTTB桶里有rps个MinMax预选点(包含重复和null) if (times.get(j) != null) { long t = times.get(j); - double v = (double) values.get(j); // TODO + double v = values.get(j); double area = IOMonitor2.calculateTri(lt, lv, t, v, pnt, pnv); // 全局尾点作为右边固定点 // System.out.printf("curr=%d,t=%d,area=%f,lt=%d%n", currentBucket, t, area, lt); if (area > maxArea) { @@ -344,7 +381,9 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { StringBuilder series = new StringBuilder(); for (long localCurStartTime = startTime; - localCurStartTime + interval < endTime; // + interval to make the last bucket complete + localCurStartTime + interval <= endTime; + // 注意有等号!因为左闭右开 + // + interval to make the last bucket complete // e.g, T=11,nout=3,interval=floor(11/3)=3, // [0,3),[3,6),[6,9), no need incomplete [9,11) // then the number of buckets must be Math.floor((endTime-startTime)/interval) @@ -456,7 +495,7 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } else if (CONFIG.getEnableTri().equals("LTTB")) { // TODO - return new LocalGroupByExecutor( + return new LocalGroupByExecutorTri_LTTB( path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } else if (CONFIG.getEnableTri().equals("ILTS")) { // TODO diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java new file mode 100644 index 00000000000..9ec058e2059 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset.groupby; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult; +import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.SeriesReader; +import org.apache.iotdb.db.utils.FileLoaderUtils; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class LocalGroupByExecutorTri_LTTB implements GroupByExecutor { + + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); + + // Aggregate result buffer of this path + private final List<AggregateResult> results = new ArrayList<>(); + + // private List<ChunkSuit4Tri> currentChunkList; + + // private final List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); + + // keys: 0,1,...,(int) Math.floor((endTime * 1.0 - startTime) / interval)-1 + private final Map<Integer, List<ChunkSuit4Tri>> splitChunkList = new HashMap<>(); + + private final long p1t = CONFIG.getP1t(); + private final double p1v = CONFIG.getP1v(); + private final long pnt = CONFIG.getPnt(); + private final double pnv = CONFIG.getPnv(); + + private long lt = p1t; + private double lv = p1v; + // private boolean[] notEmptyBucket; + + private final int N1; // 分桶数 + + private Filter timeFilter; + + public LocalGroupByExecutorTri_LTTB( + PartialPath path, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + Filter timeFilter, + TsFileFilter fileFilter, + boolean ascending) + throws StorageEngineException, QueryProcessException { + // long start = System.nanoTime(); + + // get all data sources + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); + + // update filter by TTL + this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + SeriesReader seriesReader = + new SeriesReader( + path, + allSensors, + // fix bug: here use the aggregation type as the series data type, + // not using pageReader.getAllSatisfiedPageData is ok + dataType, + context, + queryDataSource, + timeFilter, + null, + fileFilter, + ascending); + + // unpackAllOverlappedFilesToTimeSeriesMetadata + try { + // : this might be bad to load all chunk metadata at first + List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); + futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); // no need sort here + // arrange futureChunkList into each bucket + GroupByFilter groupByFilter = (GroupByFilter) timeFilter; + long startTime = groupByFilter.getStartTime(); + long endTime = groupByFilter.getEndTime(); + long interval = groupByFilter.getInterval(); + N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数 + for (ChunkSuit4Tri chunkSuit4Tri : futureChunkList) { + ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; + long chunkMinTime = chunkMetadata.getStartTime(); + long chunkMaxTime = chunkMetadata.getEndTime(); + int idx1 = (int) Math.floor((chunkMinTime - startTime) * 1.0 / interval); + int idx2 = (int) Math.floor((chunkMaxTime - startTime) * 1.0 / interval); + idx2 = (int) Math.min(idx2, N1 - 1); + for (int i = idx1; i <= idx2; i++) { + splitChunkList.computeIfAbsent(i, k -> new ArrayList<>()); + splitChunkList.get(i).add(chunkSuit4Tri); + } + } + + } catch (IOException e) { + throw new QueryProcessException(e.getMessage()); + } + + // IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, System.nanoTime() - + // start); + } + + @Override + public void addAggregateResult(AggregateResult aggrResult) { + results.add(aggrResult); + } + + // private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime) { + // // IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN; + // + // // empty currentChunkList + // currentChunkList = new ArrayList<>(); + // + // // iterate futureChunkList + // ListIterator<ChunkSuit4Tri> itr = futureChunkList.listIterator(); + // while (itr.hasNext()) { + // ChunkSuit4Tri chunkSuit4Tri = itr.next(); + // ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; + // long chunkMinTime = chunkMetadata.getStartTime(); + // long chunkMaxTime = chunkMetadata.getEndTime(); + // if (chunkMaxTime < curStartTime) { + // // the chunk falls on the left side of the current M4 interval Ii + // itr.remove(); + // } else if (chunkMinTime >= curEndTime) { + // // the chunk falls on the right side of the current M4 interval Ii, + // // and since futureChunkList is ordered by the startTime of chunkMetadata, + // // the loop can be terminated early. + // break; + // } else if (chunkMaxTime < curEndTime) { + // // this chunk is not related to buckets later + // currentChunkList.add(chunkSuit4Tri); + // itr.remove(); + // } else { + // // this chunk is overlapped with the right border of the current bucket + // currentChunkList.add(chunkSuit4Tri); + // // still keep it in the futureChunkList + // } + // } + // } + + @Override + public List<AggregateResult> calcResult( + long curStartTime, long curEndTime, long startTime, long endTime, long interval) + throws IOException { + // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型, + // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString) + // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket + StringBuilder series = new StringBuilder(); + + // clear result cache + for (AggregateResult result : results) { + result.reset(); + } + + // 全局首点 + series.append(p1v).append("[").append(p1t).append("]").append(","); + + // Assume no empty buckets + for (int b = 0; b < N1; b++) { + long rt = 0; // must initialize as zero, because may be used as sum for average + double rv = 0; // must initialize as zero, because may be used as sum for average + // 计算右边桶的固定点 + if (b == N1 - 1) { // 最后一个桶 + // 全局尾点 + rt = pnt; + rv = pnv; + } else { + // 计算右边桶的平均点 + List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b + 1); + long rightStartTime = startTime + (b + 1) * interval; + long rightEndTime = startTime + (b + 2) * interval; + int cnt = 0; + // 遍历所有与右边桶overlap的chunks + for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { + TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); + if (dataType != TSDataType.DOUBLE) { + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + // 1. load page data if it hasn't been loaded + if (chunkSuit4Tri.pageReader == null) { + chunkSuit4Tri.pageReader = + FileLoaderUtils.loadPageReaderList4CPV( + chunkSuit4Tri.chunkMetadata, this.timeFilter); + // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! + } + // 2. 计算平均点 + PageReader pageReader = chunkSuit4Tri.pageReader; + for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { + long timestamp = pageReader.timeBuffer.getLong(j * 8); + if (timestamp < rightStartTime) { + continue; + } else if (timestamp >= rightEndTime) { + break; + } else { // rightStartTime<=t<rightEndTime + ByteBuffer valueBuffer = pageReader.valueBuffer; + double v = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); + rt += timestamp; + rv += v; + cnt++; + } + } + } + rt = rt / cnt; + rv = rv / cnt; + } + + // 找到当前桶内距离lr连线最远的点 + double maxArea = -1; + long select_t = -1; + double select_v = -1; + List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b); + long localCurStartTime = startTime + (b) * interval; + long localCurEndTime = startTime + (b + 1) * interval; + // 遍历所有与当前桶overlap的chunks + for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { + TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); + if (dataType != TSDataType.DOUBLE) { + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + // load page data if it hasn't been loaded + if (chunkSuit4Tri.pageReader == null) { + chunkSuit4Tri.pageReader = + FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, this.timeFilter); + // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! + } + PageReader pageReader = chunkSuit4Tri.pageReader; + for (int j = 0; j < chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) { + long timestamp = pageReader.timeBuffer.getLong(j * 8); + if (timestamp < localCurStartTime) { + continue; + } else if (timestamp >= localCurEndTime) { + break; + } else { // localCurStartTime<=t<localCurEndTime + ByteBuffer valueBuffer = pageReader.valueBuffer; + double v = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); + double area = IOMonitor2.calculateTri(lt, lv, timestamp, v, rt, rv); + if (area > maxArea) { + maxArea = area; + select_t = timestamp; + select_v = v; + } + } + } + } + // 记录结果 + series.append(select_v).append("[").append(select_t).append("]").append(","); + + // 更新lt,lv + // 下一个桶自然地以select_t, select_v作为左桶固定点 + lt = select_t; + lv = select_v; + } + + // 全局尾点 + series.append(pnv).append("[").append(pnt).append("]").append(","); + + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0)); + + // int idx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval); + // System.out.println("idx=" + idx); + // List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(idx); + // if (chunkSuit4TriList != null) { + // for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) { + // System.out.println(chunkSuit4Tri.chunkMetadata.getStartTime()); + // } + // } + + // long start = System.nanoTime(); + // getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime); + // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); + + // if (currentChunkList.size() == 0) { + // return results; + // } + + // start = System.nanoTime(); + // calculateMinMax(currentChunkList, curStartTime, curEndTime); + // IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - start); + + return results; + } + + private void calculateMinMax( + List<ChunkSuit4Tri> currentChunkList, long curStartTime, long curEndTime) throws IOException { + for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) { + + Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics(); + + if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) { + // update BP + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult( + new MinMaxInfo<>(statistics.getMinValue(), statistics.getBottomTimestamp())); + // update TP + MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); + maxValueAggrResult.updateResult( + new MinMaxInfo<>(statistics.getMaxValue(), statistics.getTopTimestamp())); + } else { // cannot use statistics directly + + Comparable<Object> minVal = null; + long bottomTime = -1; + Comparable<Object> maxVal = null; + long topTime = -1; + + // 1. load page data if it hasn't been loaded + if (chunkSuit4Tri.pageReader == null) { + chunkSuit4Tri.pageReader = + FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, this.timeFilter); + // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! + } + + int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); + PageReader pageReader = chunkSuit4Tri.pageReader; + for (int i = chunkSuit4Tri.lastReadPos; i < count; i++) { + long timestamp = pageReader.timeBuffer.getLong(i * 8); + if (timestamp < curStartTime) { + // 2. read from lastReadPos until the first point fallen within this bucket (if it + // exists) + continue; + } else if (timestamp >= curEndTime) { + // 3. traverse until the first point fallen right this bucket, also remember to update + // lastReadPos + chunkSuit4Tri.lastReadPos = i; + break; + } else { + // 4. update MinMax by traversing points fallen within this bucket + ByteBuffer valueBuffer = pageReader.valueBuffer; + TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); + Object v; + switch (dataType) { + // case INT64: + // v = valueBuffer.getLong(pageReader.timeBufferLength + i * 8); + // break; + case DOUBLE: + v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + if (minVal == null || minVal.compareTo(v) > 0) { + minVal = (Comparable<Object>) v; + bottomTime = timestamp; + } + if (maxVal == null || maxVal.compareTo(v) < 0) { + maxVal = (Comparable<Object>) v; + topTime = timestamp; + } + } + } + // 4. update MinMax by traversing points fallen within this bucket + if (minVal != null) { + // update BP + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, bottomTime)); + // update TP + MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); + maxValueAggrResult.updateResult(new MinMaxInfo<>(maxVal, topTime)); + } + } + } + } + + public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { + long TP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getTopTimestamp(); + long BP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getBottomTimestamp(); + return TP_t >= curStartTime && TP_t < curEndTime && BP_t >= curStartTime && BP_t < curEndTime; + } + + @Override + public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) + throws IOException { + throw new IOException("no implemented"); + } + + @Override + public List<AggregateResult> calcResult(long curStartTime, long curEndTime) + throws IOException, QueryProcessException { + throw new IOException("no implemented"); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java index ec91aa29671..445af4d839c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java @@ -298,12 +298,16 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { new MinMaxInfo<>(statistics.getMaxValue(), statistics.getTopTimestamp())); } else { // cannot use statistics directly - Comparable<Object> minVal = null; + double minVal = Double.MAX_VALUE; long bottomTime = -1; - Comparable<Object> maxVal = null; + double maxVal = Double.MIN_VALUE; long topTime = -1; // 1. load page data if it hasn't been loaded + TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); + if (dataType != TSDataType.DOUBLE) { + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } if (chunkSuit4Tri.pageReader == null) { chunkSuit4Tri.pageReader = FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, this.timeFilter); @@ -330,30 +334,19 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { } else { // 4. update MinMax by traversing points fallen within this bucket ByteBuffer valueBuffer = pageReader.valueBuffer; - TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); - Object v; - switch (dataType) { - // case INT64: - // v = valueBuffer.getLong(pageReader.timeBufferLength + i * 8); - // break; - case DOUBLE: - v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); - } - if (minVal == null || minVal.compareTo(v) > 0) { - minVal = (Comparable<Object>) v; + double v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); + if (v < minVal) { + minVal = v; bottomTime = timestamp; } - if (maxVal == null || maxVal.compareTo(v) < 0) { - maxVal = (Comparable<Object>) v; + if (v > maxVal) { + maxVal = v; topTime = timestamp; } } } // 4. update MinMax by traversing points fallen within this bucket - if (minVal != null) { + if (topTime >= 0) { // update BP MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, bottomTime)); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_LTTB.java similarity index 62% copy from server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java copy to server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_LTTB.java index 8ee541a2514..c9ff3e446b5 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_LTTB.java @@ -39,7 +39,7 @@ import java.util.Locale; import static org.junit.Assert.fail; -public class MyTest_MinMaxLTTB { +public class MyTest_LTTB { /* * Sql format: SELECT min_value(s0), max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). @@ -48,6 +48,7 @@ public class MyTest_MinMaxLTTB { * (2) Assume each chunk has only one page. * (3) Assume all chunks are sequential and no deletes. * (4) Assume plain encoding, UNCOMPRESSED, Long or Double data type, no compaction + * (5) Assume no empty bucket */ private static final String TIMESTAMP_STR = "Time"; @@ -71,7 +72,7 @@ public class MyTest_MinMaxLTTB { config.setTimestampPrecision("ms"); config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); - config.setEnableTri("MinMaxLTTB"); + config.setEnableTri("LTTB"); // config.setP1t(0); // config.setP1v(-1.2079272); // config.setPnt(2100); @@ -91,100 +92,106 @@ public class MyTest_MinMaxLTTB { EnvironmentUtils.cleanEnv(); } - // @Test - // public void test1() throws Exception { - // prepareData1(); - // String res = "0,1[20],15[2],8[25],8[25],3[54],3[54],null,null,"; - // try (Connection connection = - // DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); - // Statement statement = connection.createStatement()) { - // boolean hasResultSet = - // statement.execute( - // "SELECT min_value(s0), max_value(s0)" - // + " FROM root.vehicle.d0 group by ([0,100),25ms)"); - // Assert.assertTrue(hasResultSet); - // try (ResultSet resultSet = statement.getResultSet()) { - // int i = 0; - // while (resultSet.next()) { - // // 注意从1开始编号,所以第一列是无意义时间戳 - // String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2); - // System.out.println(ans); - // Assert.assertEquals(res, ans); - // } - // } - // System.out.println(((IoTDBStatement) statement).executeFinish()); - // } catch (Exception e) { - // e.printStackTrace(); - // fail(e.getMessage()); - // } - // } - // - // private static void prepareData1() { - // // data: - // // - // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png - // // only first chunk - // // no overlap, no delete - // try (Connection connection = - // DriverManager.getConnection( - // Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); - // Statement statement = connection.createStatement()) { - // - // for (String sql : creationSqls) { - // statement.execute(sql); - // } - // - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8.0)); - // statement.execute("FLUSH"); - // - // } catch (Exception e) { - // e.printStackTrace(); - // } - // } + @Test + public void test3_2() { + prepareData3_2(); + + String res = "5.0[1],10.0[2],2.0[40],20.0[62],1.0[90],7.0[106],"; + try (Connection connection = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + boolean hasResultSet = + statement.execute( + "SELECT min_value(s0)" + // TODO not real min_value here, actually controlled by enableTri + + " FROM root.vehicle.d0 group by ([2,106),26ms)"); + + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int i = 0; + while (resultSet.next()) { + String ans = resultSet.getString(2); + // for LTTB all results are in the value string of MinValueAggrResult + // 因此对于LTTB来说,MinValueAggrResult的[t]也无意义 + ans = ans.substring(0, ans.length() - 3); + System.out.println(ans); + Assert.assertEquals(res, ans); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void prepareData3_2() { + // data: + // https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png + // slightly modified + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + for (String sql : creationSqls) { + statement.execute(sql); + } + + long[] t = new long[] {1, 2, 10, 20, 22, 30, 40, 55, 60, 62, 65, 70, 72, 80, 90, 106}; + double[] v = new double[] {5, 10, 1, 5, 4, 8, 2, 5, 15, 20, 8, 18, 4, 11, 1, 7}; + config.setP1t(t[0]); + config.setP1v(v[0]); + config.setPnt(t[t.length - 1]); + config.setPnv(v[v.length - 1]); + + for (int i = 0; i < t.length; i++) { + statement.execute(String.format(Locale.ENGLISH, insertTemplate, t[i], v[i])); + if ((i + 1) % 4 == 0) { + statement.execute("FLUSH"); + } + } + statement.execute("FLUSH"); + } catch (Exception e) { + e.printStackTrace(); + } + } @Test - public void test2() throws Exception { - prepareData2(); - config.setP1t(0); - config.setP1v(-1.2079272); - config.setPnt(2100); - config.setPnv(-0.0211206); - config.setRps(4); + public void test3() { + prepareData3(); + String res = - "-1.2079272[0],1.101946[200],-1.014322[700],0.809559[1500],-0.785419[1600],-0.0211206[2100],"; + "-1.2079272[0],1.101946[200],-0.359703[400],-1.014322[700],0.532565[900]," + + "-0.676077[1300],0.809559[1500],-0.785419[1600],-0.413534[1900],-0.0211206[2100],"; try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( - "SELECT min_value(s0), max_value(s0)" + "SELECT min_value(s0)" + // TODO not real min_value here, actually controlled by enableTri + " FROM root.vehicle.d0 group by ([100,2100),250ms)"); - // rps=4,nout=6,minmaxInterval=floor((tn-t2)/((nout-2)*rps/2))=250ms + // (tn-t2)/(nout-2)=(2100-100)/(10-2)=2000/8=250 + Assert.assertTrue(hasResultSet); try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - // 注意从1开始编号,所以第一列是无意义时间戳 String ans = resultSet.getString(2); + // for LTTB all results are in the value string of MinValueAggrResult + // 因此对于LTTB来说,MinValueAggrResult的[t]也无意义 + ans = ans.substring(0, ans.length() - 3); System.out.println(ans); Assert.assertEquals(res, ans); } } - // System.out.println(((IoTDBStatement) statement).executeFinish()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } - private static void prepareData2() { - // data: - // no overlap, no delete + private static void prepareData3() { try (Connection connection = DriverManager.getConnection( Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); @@ -224,11 +231,18 @@ public class MyTest_MinMaxLTTB { -0.21019539, -0.0211206 }; + config.setP1t(t[0]); + config.setP1v(v[0]); + config.setPnt(t[t.length - 1]); + config.setPnv(v[v.length - 1]); + for (int i = 0; i < t.length; i++) { statement.execute(String.format(Locale.ENGLISH, insertTemplate, t[i], v[i])); + if ((i + 1) % 4 == 0) { + statement.execute("FLUSH"); + } } statement.execute("FLUSH"); - } catch (Exception e) { e.printStackTrace(); } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java index abb46b1b54d..88d5bda26df 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java @@ -62,7 +62,7 @@ public class MyTest_MinMax { private final String d0s0 = "root.vehicle.d0.s0"; private static final String insertTemplate = - "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)"; + "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)"; private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -100,7 +100,7 @@ public class MyTest_MinMax { prepareData1(); // String[] res = new String[]{"0,1[20],15[2]", "25,8[25],8[25]", "50,3[54],3[54]", // "75,null,null"}; - String res = "0,1.0[20],15.0[2],8.0[25],8.0[25],3.0[54],3.0[54],null,null,"; + String res = "1.0[20],15.0[2],8.0[25],8.0[25],3.0[54],3.0[54],null,null,"; // 0,BPv[t]ofBucket1,TPv[t]ofBucket1,BPv[t]ofBucket2,TPv[t]ofBucket2,... try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); @@ -114,7 +114,7 @@ public class MyTest_MinMax { int i = 0; while (resultSet.next()) { // 注意从1开始编号,所以第一列是无意义时间戳 - String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2); + String ans = resultSet.getString(2); System.out.println(ans); Assert.assertEquals(res, ans); } @@ -140,12 +140,12 @@ public class MyTest_MinMax { statement.execute(sql); } - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8.0)); statement.execute("FLUSH"); } catch (Exception e) { @@ -159,7 +159,7 @@ public class MyTest_MinMax { // String[] res = new String[]{"0,1[10],10[2]", "25,2[40],8[30]", "50,4[72],20[62]", // "75,1[90],11[80]"}; - String res = "0,1.0[10],10.0[2],2.0[40],8.0[30],4.0[72],20.0[62],1.0[90],11.0[80],"; + String res = "1.0[10],10.0[2],2.0[40],8.0[30],4.0[72],20.0[62],1.0[90],11.0[80],"; // 0,BPv[t]ofBucket1,TPv[t]ofBucket1,BPv[t]ofBucket2,TPv[t]ofBucket2,... try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); @@ -174,7 +174,7 @@ public class MyTest_MinMax { try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2); + String ans = resultSet.getString(2); System.out.println(ans); Assert.assertEquals(res, ans); } @@ -197,28 +197,28 @@ public class MyTest_MinMax { statement.execute(sql); } - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 10)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 1)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 10.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 1.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 5.0)); statement.execute("FLUSH"); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 22, 4)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 8)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 2)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 55, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 22, 4.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 8.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 2.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 55, 5.0)); statement.execute("FLUSH"); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 15)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 62, 20)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 65, 8)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 70, 18)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 15.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 62, 20.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 65, 8.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 70, 18.0)); statement.execute("FLUSH"); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 72, 4)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 80, 11)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 90, 1)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 105, 7)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 72, 4.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 80, 11.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 90, 1.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 105, 7.0)); statement.execute("FLUSH"); } catch (Exception e) { e.printStackTrace(); @@ -231,7 +231,7 @@ public class MyTest_MinMax { // String[] res = new String[]{"0,1[10],10[2]", "25,null,null", "50,4[72],20[62]", // "75,1[90],11[80]"}; - String res = "0,1.0[10],10.0[2],null,null,4.0[72],20.0[62],1.0[90],11.0[80],"; + String res = "1.0[10],10.0[2],null,null,4.0[72],20.0[62],1.0[90],11.0[80],"; // 0,BPv[t]ofBucket1,TPv[t]ofBucket1,BPv[t]ofBucket2,TPv[t]ofBucket2,... try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); @@ -246,7 +246,7 @@ public class MyTest_MinMax { try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2); + String ans = resultSet.getString(2); System.out.println(ans); Assert.assertEquals(res, ans); } @@ -270,28 +270,28 @@ public class MyTest_MinMax { statement.execute(sql); } - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 10)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 1)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 10.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 1.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 5.0)); statement.execute("FLUSH"); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 22, 4)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 8)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 2)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 55, 5)); - // statement.execute("FLUSH"); + // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 22, 4.0)); + // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 8.0)); + // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 2.0)); + // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 55, 5.0)); + // statement.execute("FLUSH"); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 15)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 62, 20)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 65, 8)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 70, 18)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 15.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 62, 20.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 65, 8.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 70, 18.0)); statement.execute("FLUSH"); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 72, 4)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 80, 11)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 90, 1)); - statement.execute(String.format(Locale.ENGLISH, insertTemplate, 105, 7)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 72, 4.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 80, 11.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 90, 1.0)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 105, 7.0)); statement.execute("FLUSH"); } catch (Exception e) { e.printStackTrace(); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java index 8ee541a2514..2dfdd40ea95 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMaxLTTB.java @@ -91,62 +91,6 @@ public class MyTest_MinMaxLTTB { EnvironmentUtils.cleanEnv(); } - // @Test - // public void test1() throws Exception { - // prepareData1(); - // String res = "0,1[20],15[2],8[25],8[25],3[54],3[54],null,null,"; - // try (Connection connection = - // DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); - // Statement statement = connection.createStatement()) { - // boolean hasResultSet = - // statement.execute( - // "SELECT min_value(s0), max_value(s0)" - // + " FROM root.vehicle.d0 group by ([0,100),25ms)"); - // Assert.assertTrue(hasResultSet); - // try (ResultSet resultSet = statement.getResultSet()) { - // int i = 0; - // while (resultSet.next()) { - // // 注意从1开始编号,所以第一列是无意义时间戳 - // String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2); - // System.out.println(ans); - // Assert.assertEquals(res, ans); - // } - // } - // System.out.println(((IoTDBStatement) statement).executeFinish()); - // } catch (Exception e) { - // e.printStackTrace(); - // fail(e.getMessage()); - // } - // } - // - // private static void prepareData1() { - // // data: - // // - // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png - // // only first chunk - // // no overlap, no delete - // try (Connection connection = - // DriverManager.getConnection( - // Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); - // Statement statement = connection.createStatement()) { - // - // for (String sql : creationSqls) { - // statement.execute(sql); - // } - // - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3.0)); - // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8.0)); - // statement.execute("FLUSH"); - // - // } catch (Exception e) { - // e.printStackTrace(); - // } - // } - @Test public void test2() throws Exception { prepareData2(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4Tri.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4Tri.java index 5d733e84759..0f010917e93 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4Tri.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4Tri.java @@ -26,8 +26,9 @@ public class ChunkSuit4Tri { public ChunkMetadata chunkMetadata; // fixed info, including version, dataType, stepRegress - public int lastReadPos = - 0; // dynamic maintained globally, starting from 0, incremental, never decrease + // dynamic maintained globally, starting from 0, incremental, never decrease. + // only used in LocalGroupByExecutorTri_MinMax as it never reads backward + public int lastReadPos = 0; // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
