This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5010a94a4ef728ad8e76c5d96174280cf0fa9bd2 Author: Lei Rui <[email protected]> AuthorDate: Sat Jan 27 17:23:42 2024 +0800 minmax --- .../resources/conf/iotdb-engine.properties | 7 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 + .../query/aggregation/impl/MaxValueAggrResult.java | 2 +- .../query/aggregation/impl/MinValueAggrResult.java | 2 +- .../groupby/GroupByWithoutValueFilterDataSet.java | 27 +- .../groupby/LocalGroupByExecutorTri_MinMax.java | 391 +++++++++++++++++++++ .../iotdb/db/query/reader/series/SeriesReader.java | 56 +++ .../apache/iotdb/db/integration/m4/MyTest1.java | 3 +- .../iotdb/db/integration/tri/MyTest_MinMax.java | 309 ++++++++++++++++ .../iotdb/tsfile/read/common/ChunkSuit4Tri.java | 51 +++ 11 files changed, 855 insertions(+), 7 deletions(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 95228d09694..39258498cff 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -16,11 +16,16 @@ # specific language governing permissions and limitations # under the License. # +#################### +### enable Tri +#################### +# MinMax, MinMaxLTTB, M4LTTB, LTTB, ILTS +enable_Tri=MinMax #################### ### enable CPV #################### -enable_CPV=true +enable_CPV=false #################### ### Web Page Configuration diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index f45461a0690..2bf7caaec93 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -431,7 +431,9 @@ public class IoTDBConfig { /** Is performance tracing enable. */ private boolean enablePerformanceTracing = false; - private boolean enableCPV = true; + private boolean enableCPV = false; + + private String enableTri = ""; // MinMax, MinMaxLTTB, M4LTTB, LTTB, ILTS /** The display of stat performance interval in ms. */ private long performanceStatDisplayInterval = 60000; @@ -1386,6 +1388,10 @@ public class IoTDBConfig { return enableCPV; } + public String getEnableTri() { + return enableTri; + } + public void setEnablePerformanceTracing(boolean enablePerformanceTracing) { this.enablePerformanceTracing = enablePerformanceTracing; } @@ -1394,6 +1400,10 @@ public class IoTDBConfig { this.enableCPV = enableCPV; } + public void setEnableTri(String enableTri) { + this.enableTri = enableTri; + } + public long getPerformanceStatDisplayInterval() { return performanceStatDisplayInterval; } diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 4711470e002..e335d5ce0b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -539,6 +539,8 @@ public class IoTDBDescriptor { Boolean.parseBoolean( properties.getProperty("enable_CPV", Boolean.toString(conf.isEnableCPV())).trim())); + conf.setEnableTri(properties.getProperty("enable_Tri", conf.getEnableTri()).trim()); + conf.setPerformanceStatDisplayInterval( Long.parseLong( properties diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java index 27eb4a8f14e..0bdf631188e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java @@ -119,7 +119,7 @@ public class MaxValueAggrResult extends AggregateResult { protected void serializeSpecificFields(OutputStream outputStream) {} /** @author Yuyuan Kang */ - private void updateResult(MinMaxInfo maxInfo) { + public void updateResult(MinMaxInfo maxInfo) { if (maxInfo == null || maxInfo.val == null) { return; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java index 7cc3f8e3a6a..db9f74915c1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java @@ -123,7 +123,7 @@ public class MinValueAggrResult extends AggregateResult { protected void serializeSpecificFields(OutputStream outputStream) {} /** @author Yuyuan Kang */ - private void updateResult(MinMaxInfo minInfo) { + public void updateResult(MinMaxInfo minInfo) { if (minInfo == null || minInfo.val == null) { return; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index b23d9429e1f..07dbfecc3e3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -113,7 +113,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { getGroupByExecutor( path, groupByTimePlan.getAllMeasurementsInDevice(path.getDevice()), - dataTypes.get(i), + dataTypes.get( + i), // fix bug: here use the aggregation type as the series data type context, timeFilter.copy(), null, @@ -200,7 +201,29 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { TsFileFilter fileFilter, boolean ascending) throws StorageEngineException, QueryProcessException { - if (CONFIG.isEnableCPV()) { + if (CONFIG.getEnableTri().equals("MinMax")) { + // TODO + return new LocalGroupByExecutorTri_MinMax( + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } else if (CONFIG.getEnableTri().equals("MinMaxLTTB")) { + // TODO + return new LocalGroupByExecutor( + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } else if (CONFIG.getEnableTri().equals("M4LTTB")) { + // TODO + return new LocalGroupByExecutor( + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } else if (CONFIG.getEnableTri().equals("LTTB")) { + // TODO + return new LocalGroupByExecutor( + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } else if (CONFIG.getEnableTri().equals("ILTS")) { + // TODO + return new LocalGroupByExecutor( + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } + // deprecated below + else if (CONFIG.isEnableCPV()) { if (TSFileDescriptor.getInstance().getConfig().isEnableMinMaxLSM()) { // MinMax-LSM IOMonitor2.dataSetType = DataSetType.GroupByWithoutValueFilterDataSet_LocalGroupByExecutor4CPV_EnableMinMaxLSM; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java new file mode 100644 index 00000000000..acf78c63ea4 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset.groupby; + +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult; +import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.SeriesReader; +import org.apache.iotdb.db.utils.FileLoaderUtils; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; + +public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { + + private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); + + // Aggregate result buffer of this path + private final List<AggregateResult> results = new ArrayList<>(); + + private List<ChunkSuit4Tri> currentChunkList; + private final List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); + + private Filter timeFilter; + + public LocalGroupByExecutorTri_MinMax( + PartialPath path, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + Filter timeFilter, + TsFileFilter fileFilter, + boolean ascending) + throws StorageEngineException, QueryProcessException { + // long start = System.nanoTime(); + + // get all data sources + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); + + // update filter by TTL + this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + SeriesReader seriesReader = + new SeriesReader( + path, + allSensors, + // fix bug: here use the aggregation type as the series data type, + // not using pageReader.getAllSatisfiedPageData is ok + dataType, + context, + queryDataSource, + timeFilter, + null, + fileFilter, + ascending); + + // unpackAllOverlappedFilesToTimeSeriesMetadata + try { + // : this might be bad to load all chunk metadata at first + futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); + // order futureChunkList by chunk startTime + futureChunkList.sort( + new Comparator<ChunkSuit4Tri>() { + public int compare(ChunkSuit4Tri o1, ChunkSuit4Tri o2) { + return ((Comparable) (o1.chunkMetadata.getStartTime())) + .compareTo(o2.chunkMetadata.getStartTime()); + } + }); + + if (M4_CHUNK_METADATA.isDebugEnabled()) { + if (timeFilter instanceof GroupByFilter) { + M4_CHUNK_METADATA.debug( + "M4_QUERY_PARAM,{},{},{}", + ((GroupByFilter) timeFilter).getStartTime(), + ((GroupByFilter) timeFilter).getEndTime(), + ((GroupByFilter) timeFilter).getInterval()); + } + for (ChunkSuit4Tri ChunkSuit4Tri : futureChunkList) { + Statistics statistics = ChunkSuit4Tri.chunkMetadata.getStatistics(); + long FP_t = statistics.getStartTime(); + long LP_t = statistics.getEndTime(); + long BP_t = statistics.getBottomTimestamp(); + long TP_t = statistics.getTopTimestamp(); + switch (statistics.getType()) { + case INT32: + int FP_v_int = ((IntegerStatistics) statistics).getFirstValue(); + int LP_v_int = ((IntegerStatistics) statistics).getLastValue(); + int BP_v_int = ((IntegerStatistics) statistics).getMinValue(); + int TP_v_int = ((IntegerStatistics) statistics).getMaxValue(); + M4_CHUNK_METADATA.debug( + "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", + FP_t, + LP_t, + BP_t, + TP_t, + FP_v_int, + LP_v_int, + BP_v_int, + TP_v_int, + ChunkSuit4Tri.chunkMetadata.getVersion(), + ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), + statistics.getCount()); + break; + case INT64: + long FP_v_long = ((LongStatistics) statistics).getFirstValue(); + long LP_v_long = ((LongStatistics) statistics).getLastValue(); + long BP_v_long = ((LongStatistics) statistics).getMinValue(); + long TP_v_long = ((LongStatistics) statistics).getMaxValue(); + M4_CHUNK_METADATA.debug( + "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", + FP_t, + LP_t, + BP_t, + TP_t, + FP_v_long, + LP_v_long, + BP_v_long, + TP_v_long, + ChunkSuit4Tri.chunkMetadata.getVersion(), + ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), + statistics.getCount()); + break; + case FLOAT: + float FP_v_float = ((FloatStatistics) statistics).getFirstValue(); + float LP_v_float = ((FloatStatistics) statistics).getLastValue(); + float BP_v_float = ((FloatStatistics) statistics).getMinValue(); + float TP_v_float = ((FloatStatistics) statistics).getMaxValue(); + M4_CHUNK_METADATA.debug( + "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", + FP_t, + LP_t, + BP_t, + TP_t, + FP_v_float, + LP_v_float, + BP_v_float, + TP_v_float, + ChunkSuit4Tri.chunkMetadata.getVersion(), + ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), + statistics.getCount()); + break; + case DOUBLE: + double FP_v_double = ((DoubleStatistics) statistics).getFirstValue(); + double LP_v_double = ((DoubleStatistics) statistics).getLastValue(); + double BP_v_double = ((DoubleStatistics) statistics).getMinValue(); + double TP_v_double = ((DoubleStatistics) statistics).getMaxValue(); + M4_CHUNK_METADATA.debug( + "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", + FP_t, + LP_t, + BP_t, + TP_t, + FP_v_double, + LP_v_double, + BP_v_double, + TP_v_double, + ChunkSuit4Tri.chunkMetadata.getVersion(), + ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), + statistics.getCount()); + break; + default: + throw new QueryProcessException("unsupported data type!"); + } + } + } + + } catch (IOException e) { + throw new QueryProcessException(e.getMessage()); + } + + // IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, System.nanoTime() - + // start); + } + + @Override + public void addAggregateResult(AggregateResult aggrResult) { + results.add(aggrResult); + } + + private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime) { + // IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN; + + // empty currentChunkList + currentChunkList = new ArrayList<>(); + + // iterate futureChunkList + ListIterator<ChunkSuit4Tri> itr = futureChunkList.listIterator(); + while (itr.hasNext()) { + ChunkSuit4Tri chunkSuit4Tri = itr.next(); + ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; + long chunkMinTime = chunkMetadata.getStartTime(); + long chunkMaxTime = chunkMetadata.getEndTime(); + if (chunkMaxTime < curStartTime) { + // the chunk falls on the left side of the current M4 interval Ii + itr.remove(); + } else if (chunkMinTime >= curEndTime) { + // the chunk falls on the right side of the current M4 interval Ii, + // and since futureChunkList is ordered by the startTime of chunkMetadata, + // the loop can be terminated early. + break; + } else if (chunkMaxTime < curEndTime) { + // this chunk is not related to buckets later + currentChunkList.add(chunkSuit4Tri); + itr.remove(); + } else { + // this chunk is overlapped with the right border of the current bucket + currentChunkList.add(chunkSuit4Tri); + // still keep it in the futureChunkList + } + } + } + + /** + * @param curStartTime closed + * @param curEndTime open + * @param startTime closed + * @param endTime open + */ + @Override + public List<AggregateResult> calcResult( + long curStartTime, long curEndTime, long startTime, long endTime, long interval) + throws IOException { + // clear result cache + for (AggregateResult result : results) { + result.reset(); + } + + // long start = System.nanoTime(); + getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime); + // IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, System.nanoTime() - start); + + if (currentChunkList.size() == 0) { + return results; + } + + // start = System.nanoTime(); + calculateMinMax(currentChunkList, curStartTime, curEndTime); + // IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - start); + + return results; + } + + private void calculateMinMax( + List<ChunkSuit4Tri> currentChunkList, long curStartTime, long curEndTime) throws IOException { + for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) { + + Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics(); + + if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) { + // update BP + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult( + new MinMaxInfo<>(statistics.getMinValue(), statistics.getBottomTimestamp())); + // update TP + MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); + maxValueAggrResult.updateResult( + new MinMaxInfo<>(statistics.getMaxValue(), statistics.getTopTimestamp())); + } else { // cannot use statistics directly + + Comparable<Object> minVal = null; + long bottomTime = -1; + Comparable<Object> maxVal = null; + long topTime = -1; + + // 1. load page data if it hasn't been loaded + if (chunkSuit4Tri.pageReader == null) { + chunkSuit4Tri.pageReader = + FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, this.timeFilter); + // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! + } + + int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); + PageReader pageReader = chunkSuit4Tri.pageReader; + for (int i = chunkSuit4Tri.lastReadPos; i < count; i++) { + long timestamp = pageReader.timeBuffer.getLong(i * 8); + if (timestamp < curStartTime) { + // 2. read from lastReadPos until the first point fallen within this bucket (if it + // exists) + continue; + } else if (timestamp >= curEndTime) { + // 3. traverse until the first point fallen right this bucket, also remember to update + // lastReadPos + chunkSuit4Tri.lastReadPos = i; + break; + } else { + // 4. update MinMax by traversing points fallen within this bucket + ByteBuffer valueBuffer = pageReader.valueBuffer; + TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); + Object v; + switch (dataType) { + case INT64: + v = valueBuffer.getLong(pageReader.timeBufferLength + i * 8); + break; + case DOUBLE: + v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + if (minVal == null || minVal.compareTo(v) > 0) { + minVal = (Comparable<Object>) v; + bottomTime = timestamp; + } + if (maxVal == null || maxVal.compareTo(v) < 0) { + maxVal = (Comparable<Object>) v; + topTime = timestamp; + } + } + } + // 4. update MinMax by traversing points fallen within this bucket + if (minVal != null) { + // update BP + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, bottomTime)); + // update TP + MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) results.get(1); + maxValueAggrResult.updateResult(new MinMaxInfo<>(maxVal, topTime)); + } + } + } + } + + public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { + long TP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getTopTimestamp(); + long BP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getBottomTimestamp(); + return TP_t >= curStartTime && TP_t < curEndTime && BP_t >= curStartTime && BP_t < curEndTime; + } + + @Override + public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) + throws IOException { + throw new IOException("no implemented"); + } + + @Override + public List<AggregateResult> calcResult(long curStartTime, long curEndTime) + throws IOException, QueryProcessException { + throw new IOException("no implemented"); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index 658ce00073c..988b5e101b9 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.BatchDataFactory; import org.apache.iotdb.tsfile.read.common.ChunkSuit4CPV; +import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter; import org.apache.iotdb.tsfile.read.reader.IPageReader; @@ -253,6 +254,38 @@ public class SeriesReader { return chunkSuit4CPVList; } + public List<ChunkSuit4Tri> getAllChunkMetadatas4Tri() throws IOException { + List<ChunkSuit4Tri> chunkSuit4TriList = new ArrayList<>(); + while (orderUtils.hasNextUnseqResource()) { + TimeseriesMetadata timeseriesMetadata = + FileLoaderUtils.loadTimeSeriesMetadata( + orderUtils.getNextUnseqFileResource(true), + seriesPath, + context, + getAnyFilter(), + allSensors); + if (timeseriesMetadata != null) { + timeseriesMetadata.setModified(true); + timeseriesMetadata.setSeq(false); + } + unpackOneTimeSeriesMetadata4Tri(timeseriesMetadata, chunkSuit4TriList); + } + while (orderUtils.hasNextSeqResource()) { + TimeseriesMetadata timeseriesMetadata = + FileLoaderUtils.loadTimeSeriesMetadata( + orderUtils.getNextSeqFileResource(true), + seriesPath, + context, + getAnyFilter(), + allSensors); + if (timeseriesMetadata != null) { + timeseriesMetadata.setSeq(true); + } + unpackOneTimeSeriesMetadata4Tri(timeseriesMetadata, chunkSuit4TriList); + } + return chunkSuit4TriList; + } + private void unpackOneTimeSeriesMetadata4CPV( TimeseriesMetadata timeSeriesMetadata, List<ChunkSuit4CPV> chunkSuit4CPVList) throws IOException { @@ -276,6 +309,29 @@ public class SeriesReader { } } + private void unpackOneTimeSeriesMetadata4Tri( + TimeseriesMetadata timeSeriesMetadata, List<ChunkSuit4Tri> chunkSuit4TriList) + throws IOException { + List<ChunkMetadata> chunkMetadataList = + FileLoaderUtils.loadChunkMetadataList(timeSeriesMetadata); + chunkMetadataList.forEach(chunkMetadata -> chunkMetadata.setSeq(timeSeriesMetadata.isSeq())); + + // try to calculate the total number of chunk and time-value points in chunk + if (IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) { + long totalChunkPointsNum = + chunkMetadataList.stream() + .mapToLong(chunkMetadata -> chunkMetadata.getStatistics().getCount()) + .sum(); + TracingManager.getInstance() + .getTracingInfo(context.getQueryId()) + .addChunkInfo(chunkMetadataList.size(), totalChunkPointsNum); + } + + for (ChunkMetadata chunkMetadata : chunkMetadataList) { + chunkSuit4TriList.add(new ChunkSuit4Tri(chunkMetadata)); + } + } + boolean hasNextFile() throws IOException { QueryTimeManager.checkQueryAlive(context.getQueryId()); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java index 7230a4f88a3..45bef17cbef 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java @@ -76,9 +76,10 @@ public class MyTest1 { // TSFileDescriptor.getInstance().getConfig().setUseTimeIndex(false); originalUseMad = TSFileDescriptor.getInstance().getConfig().isUseMad(); - TSFileDescriptor.getInstance().getConfig().setUseMad(true); + TSFileDescriptor.getInstance().getConfig().setUseMad(true); TSFileDescriptor.getInstance().getConfig().setErrorParam(10); + TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(false); EnvironmentUtils.envSetUp(); Class.forName(Config.JDBC_DRIVER_NAME); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java new file mode 100644 index 00000000000..6f110759ca6 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.integration.tri; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.jdbc.IoTDBStatement; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Locale; + +import static org.junit.Assert.fail; + +public class MyTest_MinMax { + + /* + * Sql format: SELECT min_value(s0), max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). + * Requirements: + * (1) Don't change the sequence of the above two aggregates + * (2) Assume each chunk has only one page. + * (3) Assume all chunks are sequential and no deletes. + * (4) Assume plain encoding, UNCOMPRESSED, Long or Double data type, no compaction + */ + private static final String TIMESTAMP_STR = "Time"; + + private static String[] creationSqls = + new String[] { + "SET STORAGE GROUP TO root.vehicle.d0", + "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT64, ENCODING=PLAIN", + // IoTDB int data type does not support plain encoding, so use long data type + }; + + private final String d0s0 = "root.vehicle.d0.s0"; + + private static final String insertTemplate = + "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)"; + + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + @Before + public void setUp() throws Exception { + TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN"); + // originalCompactionStrategy = config.getCompactionStrategy(); + config.setTimestampPrecision("ms"); + config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); + + config.setEnableTri("MinMax"); + + // 但是如果走的是unpackOneChunkMetaData(firstChunkMetadata)就没问题, + // 因为它直接用chunk元数据去构造pageReader, + // 但是如果走的是传统聚合类型->seriesAggregateReader->seriesReader->hasNextOverlappedPage里 + // cachedBatchData = BatchDataFactory.createBatchData(dataType, orderUtils.getAscending(), true) + // 这个路径就错了,把聚合类型赋给batchData了。所以这个LocalGroupByExecutor bug得在有overlap数据的时候才能复现 + // (那刚好我本文数据都不会有Overlap,可以用LocalGroupByExecutor来得到正确结果) + config.setEnableCPV(false); + TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(false); + TSFileDescriptor.getInstance().getConfig().setUseStatistics(false); + + EnvironmentUtils.envSetUp(); + Class.forName(Config.JDBC_DRIVER_NAME); + } + + @After + public void tearDown() throws Exception { + EnvironmentUtils.cleanEnv(); + // config.setCompactionStrategy(originalCompactionStrategy); + } + + @Test + public void test1() throws Exception { + prepareData1(); + + String[] res = + new String[] {"0,1[20],15[2]", "25,8[25],8[25]", "50,3[54],3[54]", "75,null,null"}; + try (Connection connection = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + boolean hasResultSet = + statement.execute( + "SELECT min_value(s0), max_value(s0)" + + " FROM root.vehicle.d0 group by ([0,100),25ms)"); + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int i = 0; + while (resultSet.next()) { + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(String.format("min_value(%s)", d0s0)) + + "," + + resultSet.getString(String.format("max_value(%s)", d0s0)); + System.out.println(ans); + Assert.assertEquals(res[i++], ans); + } + } + System.out.println(((IoTDBStatement) statement).executeFinish()); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void prepareData1() { + // data: + // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png + // only first chunk + // no overlap, no delete + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + for (String sql : creationSqls) { + statement.execute(sql); + } + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 15)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 1)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 25, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 54, 3)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 120, 8)); + statement.execute("FLUSH"); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void test3() { + prepareData3(); + + String[] res = + new String[] {"0,1[10],10[2]", "25,2[40],8[30]", "50,4[72],20[62]", "75,1[90],11[80]"}; + try (Connection connection = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + boolean hasResultSet = + statement.execute( + "SELECT min_value(s0), max_value(s0)" + + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the + // sequence!!! + + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int i = 0; + while (resultSet.next()) { + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(String.format("min_value(%s)", d0s0)) + + "," + + resultSet.getString(String.format("max_value(%s)", d0s0)); + System.out.println(ans); + Assert.assertEquals(res[i++], ans); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void prepareData3() { + // data: + // https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + for (String sql : creationSqls) { + statement.execute(sql); + } + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 10)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 1)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 5)); + statement.execute("FLUSH"); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 22, 4)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 2)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 55, 5)); + statement.execute("FLUSH"); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 15)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 62, 20)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 65, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 70, 18)); + statement.execute("FLUSH"); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 72, 4)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 80, 11)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 90, 1)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 105, 7)); + statement.execute("FLUSH"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void test3_2() { + prepareData3_2(); + + String[] res = + new String[] {"0,1[10],10[2]", "25,null,null", "50,4[72],20[62]", "75,1[90],11[80]"}; + try (Connection connection = + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + boolean hasResultSet = + statement.execute( + "SELECT min_value(s0), max_value(s0)" + + " FROM root.vehicle.d0 group by ([0,100),25ms)"); // don't change the + // sequence!!! + + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int i = 0; + while (resultSet.next()) { + String ans = + resultSet.getString(TIMESTAMP_STR) + + "," + + resultSet.getString(String.format("min_value(%s)", d0s0)) + + "," + + resultSet.getString(String.format("max_value(%s)", d0s0)); + System.out.println(ans); + Assert.assertEquals(res[i++], ans); + } + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void prepareData3_2() { + // data: + // https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png + // remove chunk 2 so that bucket 2 is empty + try (Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + for (String sql : creationSqls) { + statement.execute(sql); + } + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 10)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 1)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 20, 5)); + statement.execute("FLUSH"); + + // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 22, 4)); + // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 30, 8)); + // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 40, 2)); + // statement.execute(String.format(Locale.ENGLISH, insertTemplate, 55, 5)); + // statement.execute("FLUSH"); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 60, 15)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 62, 20)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 65, 8)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 70, 18)); + statement.execute("FLUSH"); + + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 72, 4)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 80, 11)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 90, 1)); + statement.execute(String.format(Locale.ENGLISH, insertTemplate, 105, 7)); + statement.execute("FLUSH"); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4Tri.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4Tri.java new file mode 100644 index 00000000000..5d733e84759 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4Tri.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.read.common; + +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; + +public class ChunkSuit4Tri { + + public ChunkMetadata chunkMetadata; // fixed info, including version, dataType, stepRegress + + public int lastReadPos = + 0; // dynamic maintained globally, starting from 0, incremental, never decrease + + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! + public PageReader pageReader; // bears fixed plain timeBuffer and valueBuffer + // pageReader does not refer to the same deleteInterval as those in chunkMetadata + // after chunkMetadata executes insertIntoSortedDeletions + + public ChunkSuit4Tri(ChunkMetadata chunkMetadata) { + this.chunkMetadata = chunkMetadata; + this.lastReadPos = 0; + } + + public ChunkSuit4Tri(ChunkMetadata chunkMetadata, PageReader pageReader) { + this.chunkMetadata = chunkMetadata; + this.pageReader = pageReader; + this.lastReadPos = 0; + } +}
