This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 742a1b885b65592b245960c4b24de9c2a5ed9680 Author: Lei Rui <[email protected]> AuthorDate: Mon Jul 15 02:14:43 2024 +0800 add --- .../resources/conf/iotdb-engine.properties | 2 +- .../groupby/GroupByWithoutValueFilterDataSet.java | 3 + .../groupby/LocalGroupByExecutorTri_Uniform.java | 396 +++++++++++++++++++++ 3 files changed, 400 insertions(+), 1 deletion(-) diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties index 7cc6cfcc97a..e0d4d6e20d4 100644 --- a/server/src/assembly/resources/conf/iotdb-engine.properties +++ b/server/src/assembly/resources/conf/iotdb-engine.properties @@ -19,7 +19,7 @@ #################### ### enable Tri #################### -# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece, SC, FSW +# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece, SC, FSW, Uniform enable_Tri=MinMax # segment error threshold for SimPiece, SC, FSW diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 5818d4b6a04..df2869e776f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -444,6 +444,9 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } else if (CONFIG.getEnableTri().equals("FSW")) { return new LocalGroupByExecutorTri_FSW( path, allSensors, dataType, context, timeFilter, fileFilter, ascending); + } else if (CONFIG.getEnableTri().equals("Uniform")) { + return new LocalGroupByExecutorTri_Uniform( + path, allSensors, dataType, context, timeFilter, fileFilter, ascending); } else { logger.info("No matched enable_tri!"); return new LocalGroupByExecutor( diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_Uniform.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_Uniform.java new file mode 100644 index 00000000000..1145376b544 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_Uniform.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.dataset.groupby; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.aggregation.AggregateResult; +import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.db.query.reader.series.SeriesReader; +import org.apache.iotdb.db.utils.FileLoaderUtils; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; +import org.apache.iotdb.tsfile.read.filter.GroupByFilter; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import org.apache.iotdb.tsfile.utils.Pair; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; + +public class LocalGroupByExecutorTri_Uniform implements GroupByExecutor { + + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + + private static final Logger M4_CHUNK_METADATA = LoggerFactory.getLogger("M4_CHUNK_METADATA"); + + // Aggregate result buffer of this path + private final List<AggregateResult> results = new ArrayList<>(); + + private List<ChunkSuit4Tri> currentChunkList; + private final List<ChunkSuit4Tri> futureChunkList = new ArrayList<>(); + + private Filter timeFilter; + + private final int N1; + + public LocalGroupByExecutorTri_Uniform( + PartialPath path, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + Filter timeFilter, + TsFileFilter fileFilter, + boolean ascending) + throws StorageEngineException, QueryProcessException { + // long start = System.nanoTime(); + + // get all data sources + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); + + // update filter by TTL + this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + SeriesReader seriesReader = + new SeriesReader( + path, + allSensors, + // fix bug: here use the aggregation type as the series data type, + // not using pageReader.getAllSatisfiedPageData is ok + dataType, + context, + queryDataSource, + timeFilter, + null, + fileFilter, + ascending); + + GroupByFilter groupByFilter = (GroupByFilter) timeFilter; + long startTime = groupByFilter.getStartTime(); + long endTime = groupByFilter.getEndTime(); + long interval = groupByFilter.getInterval(); + N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数 + + try { + // : this might be bad to load all chunk metadata at first + futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); + // order futureChunkList by chunk startTime + futureChunkList.sort( + new Comparator<ChunkSuit4Tri>() { + public int compare(ChunkSuit4Tri o1, ChunkSuit4Tri o2) { + return ((Comparable) (o1.chunkMetadata.getStartTime())) + .compareTo(o2.chunkMetadata.getStartTime()); + } + }); + + if (M4_CHUNK_METADATA.isDebugEnabled()) { + if (timeFilter instanceof GroupByFilter) { + M4_CHUNK_METADATA.debug( + "M4_QUERY_PARAM,{},{},{}", + ((GroupByFilter) timeFilter).getStartTime(), + ((GroupByFilter) timeFilter).getEndTime(), + ((GroupByFilter) timeFilter).getInterval()); + } + for (ChunkSuit4Tri ChunkSuit4Tri : futureChunkList) { + Statistics statistics = ChunkSuit4Tri.chunkMetadata.getStatistics(); + long FP_t = statistics.getStartTime(); + long LP_t = statistics.getEndTime(); + long BP_t = statistics.getBottomTimestamp(); + long TP_t = statistics.getTopTimestamp(); + switch (statistics.getType()) { + case INT32: + int FP_v_int = ((IntegerStatistics) statistics).getFirstValue(); + int LP_v_int = ((IntegerStatistics) statistics).getLastValue(); + int BP_v_int = ((IntegerStatistics) statistics).getMinValue(); + int TP_v_int = ((IntegerStatistics) statistics).getMaxValue(); + M4_CHUNK_METADATA.debug( + "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", + FP_t, + LP_t, + BP_t, + TP_t, + FP_v_int, + LP_v_int, + BP_v_int, + TP_v_int, + ChunkSuit4Tri.chunkMetadata.getVersion(), + ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), + statistics.getCount()); + break; + case INT64: + long FP_v_long = ((LongStatistics) statistics).getFirstValue(); + long LP_v_long = ((LongStatistics) statistics).getLastValue(); + long BP_v_long = ((LongStatistics) statistics).getMinValue(); + long TP_v_long = ((LongStatistics) statistics).getMaxValue(); + M4_CHUNK_METADATA.debug( + "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", + FP_t, + LP_t, + BP_t, + TP_t, + FP_v_long, + LP_v_long, + BP_v_long, + TP_v_long, + ChunkSuit4Tri.chunkMetadata.getVersion(), + ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), + statistics.getCount()); + break; + case FLOAT: + float FP_v_float = ((FloatStatistics) statistics).getFirstValue(); + float LP_v_float = ((FloatStatistics) statistics).getLastValue(); + float BP_v_float = ((FloatStatistics) statistics).getMinValue(); + float TP_v_float = ((FloatStatistics) statistics).getMaxValue(); + M4_CHUNK_METADATA.debug( + "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", + FP_t, + LP_t, + BP_t, + TP_t, + FP_v_float, + LP_v_float, + BP_v_float, + TP_v_float, + ChunkSuit4Tri.chunkMetadata.getVersion(), + ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), + statistics.getCount()); + break; + case DOUBLE: + double FP_v_double = ((DoubleStatistics) statistics).getFirstValue(); + double LP_v_double = ((DoubleStatistics) statistics).getLastValue(); + double BP_v_double = ((DoubleStatistics) statistics).getMinValue(); + double TP_v_double = ((DoubleStatistics) statistics).getMaxValue(); + M4_CHUNK_METADATA.debug( + "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}", + FP_t, + LP_t, + BP_t, + TP_t, + FP_v_double, + LP_v_double, + BP_v_double, + TP_v_double, + ChunkSuit4Tri.chunkMetadata.getVersion(), + ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(), + statistics.getCount()); + break; + default: + throw new QueryProcessException("unsupported data type!"); + } + } + } + + } catch (IOException e) { + throw new QueryProcessException(e.getMessage()); + } + + // IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, System.nanoTime() - + // start); + } + + @Override + public void addAggregateResult(AggregateResult aggrResult) { + results.add(aggrResult); + } + + private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime) { + // IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN; + + // empty currentChunkList + currentChunkList = new ArrayList<>(); + + // iterate futureChunkList + ListIterator<ChunkSuit4Tri> itr = futureChunkList.listIterator(); + while (itr.hasNext()) { + ChunkSuit4Tri chunkSuit4Tri = itr.next(); + ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata; + long chunkMinTime = chunkMetadata.getStartTime(); + long chunkMaxTime = chunkMetadata.getEndTime(); + if (chunkMaxTime < curStartTime) { + // the chunk falls on the left side of the current M4 interval Ii + itr.remove(); + } else if (chunkMinTime >= curEndTime) { + // the chunk falls on the right side of the current M4 interval Ii, + // and since futureChunkList is ordered by the startTime of chunkMetadata, + // the loop can be terminated early. + break; + } else if (chunkMaxTime < curEndTime) { + // this chunk is not related to buckets later + currentChunkList.add(chunkSuit4Tri); + itr.remove(); + } else { + // this chunk is overlapped with the right border of the current bucket + currentChunkList.add(chunkSuit4Tri); + // still keep it in the futureChunkList + } + } + } + + @Override + public List<AggregateResult> calcResult( + long curStartTime, long curEndTime, long startTime, long endTime, long interval) + throws IOException { + StringBuilder series = new StringBuilder(); + + // clear result cache + for (AggregateResult result : results) { + result.reset(); + } + + series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(","); + + // Assume no empty buckets + for (int b = 0; b < N1; b++) { + long localCurStartTime = startTime + (b) * interval; + long localCurEndTime = startTime + (b + 1) * interval; + + getCurrentChunkListFromFutureChunkList(localCurStartTime, localCurEndTime); + + if (currentChunkList.size() == 0) { + // minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime] + series.append("null[null],null[null],null[null],null[null],"); + continue; + } + + calculateFirst(currentChunkList, localCurStartTime, localCurEndTime, series); + } + + series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(","); + + MinValueAggrResult minValueAggrResult = (MinValueAggrResult) results.get(0); + minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0)); + + return results; + } + + private void calculateFirst( + List<ChunkSuit4Tri> currentChunkList, + long curStartTime, + long curEndTime, + StringBuilder series) + throws IOException { + long firstTime = -1; + double firstValue = 0; + + for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) { + + Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics(); + + if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) { + // update FP + if (firstTime < 0) { + firstTime = statistics.getStartTime(); + firstValue = (double) statistics.getFirstValue(); + } + } else { // cannot use statistics directly + // 1. load page data if it hasn't been loaded + TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType(); + if (dataType != TSDataType.DOUBLE) { + throw new UnSupportedDataTypeException(String.valueOf(dataType)); + } + if (chunkSuit4Tri.pageReader == null) { + chunkSuit4Tri.pageReader = + FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, this.timeFilter); + // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS! + } + + int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); + PageReader pageReader = chunkSuit4Tri.pageReader; + int i; + for (i = chunkSuit4Tri.lastReadPos; i < count; i++) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; + long timestamp = pageReader.timeBuffer.getLong(i * 8); + if (timestamp < curStartTime) { + // 2. read from lastReadPos until the first point fallen within this bucket (if it + // exists) + continue; + } else if (timestamp >= curEndTime) { + // 3. traverse until the first point fallen right this bucket, also remember to update + // lastReadPos + chunkSuit4Tri.lastReadPos = i; + break; + } else { + // 4. update MinMax by traversing points fallen within this bucket + ByteBuffer valueBuffer = pageReader.valueBuffer; + double v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8); + if (firstTime < 0) { + firstTime = timestamp; + firstValue = v; + break; + } + } + } + } + } + if (firstTime >= 0) { + // minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime] + series.append(firstValue).append("[").append(firstTime).append("]").append(","); + } else { + // empty bucket although statistics cover + // minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime] + series.append("null[null],"); + } + } + + public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long curStartTime, long curEndTime) { + return false; + } + + @Override + public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) + throws IOException { + throw new IOException("no implemented"); + } + + @Override + public List<AggregateResult> calcResult(long curStartTime, long curEndTime) + throws IOException, QueryProcessException { + throw new IOException("no implemented"); + } +}
