This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/LTS-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 742a1b885b65592b245960c4b24de9c2a5ed9680
Author: Lei Rui <[email protected]>
AuthorDate: Mon Jul 15 02:14:43 2024 +0800

    add
---
 .../resources/conf/iotdb-engine.properties         |   2 +-
 .../groupby/GroupByWithoutValueFilterDataSet.java  |   3 +
 .../groupby/LocalGroupByExecutorTri_Uniform.java   | 396 +++++++++++++++++++++
 3 files changed, 400 insertions(+), 1 deletion(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 7cc6cfcc97a..e0d4d6e20d4 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -19,7 +19,7 @@
 ####################
 ### enable Tri
 ####################
-# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece, SC, FSW
+# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece, SC, FSW, Uniform
 enable_Tri=MinMax
 
 # segment error threshold for SimPiece, SC, FSW
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 5818d4b6a04..df2869e776f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -444,6 +444,9 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
     } else if (CONFIG.getEnableTri().equals("FSW")) {
       return new LocalGroupByExecutorTri_FSW(
           path, allSensors, dataType, context, timeFilter, fileFilter, 
ascending);
+    } else if (CONFIG.getEnableTri().equals("Uniform")) {
+      return new LocalGroupByExecutorTri_Uniform(
+          path, allSensors, dataType, context, timeFilter, fileFilter, 
ascending);
     } else {
       logger.info("No matched enable_tri!");
       return new LocalGroupByExecutor(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_Uniform.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_Uniform.java
new file mode 100644
index 00000000000..1145376b544
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_Uniform.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics;
+import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri;
+import org.apache.iotdb.tsfile.read.common.IOMonitor2;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+
+public class LocalGroupByExecutorTri_Uniform implements GroupByExecutor {
+
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private static final Logger M4_CHUNK_METADATA = 
LoggerFactory.getLogger("M4_CHUNK_METADATA");
+
+  // Aggregate result buffer of this path
+  private final List<AggregateResult> results = new ArrayList<>();
+
+  private List<ChunkSuit4Tri> currentChunkList;
+  private final List<ChunkSuit4Tri> futureChunkList = new ArrayList<>();
+
+  private Filter timeFilter;
+
+  private final int N1;
+
+  public LocalGroupByExecutorTri_Uniform(
+      PartialPath path,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    //    long start = System.nanoTime();
+
+    // get all data sources
+    QueryDataSource queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, 
this.timeFilter);
+
+    // update filter by TTL
+    this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    SeriesReader seriesReader =
+        new SeriesReader(
+            path,
+            allSensors,
+            // fix bug: here use the aggregation type as the series data type,
+            // not using pageReader.getAllSatisfiedPageData is ok
+            dataType,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    GroupByFilter groupByFilter = (GroupByFilter) timeFilter;
+    long startTime = groupByFilter.getStartTime();
+    long endTime = groupByFilter.getEndTime();
+    long interval = groupByFilter.getInterval();
+    N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数
+
+    try {
+      // : this might be bad to load all chunk metadata at first
+      futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri());
+      // order futureChunkList by chunk startTime
+      futureChunkList.sort(
+          new Comparator<ChunkSuit4Tri>() {
+            public int compare(ChunkSuit4Tri o1, ChunkSuit4Tri o2) {
+              return ((Comparable) (o1.chunkMetadata.getStartTime()))
+                  .compareTo(o2.chunkMetadata.getStartTime());
+            }
+          });
+
+      if (M4_CHUNK_METADATA.isDebugEnabled()) {
+        if (timeFilter instanceof GroupByFilter) {
+          M4_CHUNK_METADATA.debug(
+              "M4_QUERY_PARAM,{},{},{}",
+              ((GroupByFilter) timeFilter).getStartTime(),
+              ((GroupByFilter) timeFilter).getEndTime(),
+              ((GroupByFilter) timeFilter).getInterval());
+        }
+        for (ChunkSuit4Tri ChunkSuit4Tri : futureChunkList) {
+          Statistics statistics = ChunkSuit4Tri.chunkMetadata.getStatistics();
+          long FP_t = statistics.getStartTime();
+          long LP_t = statistics.getEndTime();
+          long BP_t = statistics.getBottomTimestamp();
+          long TP_t = statistics.getTopTimestamp();
+          switch (statistics.getType()) {
+            case INT32:
+              int FP_v_int = ((IntegerStatistics) statistics).getFirstValue();
+              int LP_v_int = ((IntegerStatistics) statistics).getLastValue();
+              int BP_v_int = ((IntegerStatistics) statistics).getMinValue();
+              int TP_v_int = ((IntegerStatistics) statistics).getMaxValue();
+              M4_CHUNK_METADATA.debug(
+                  "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}",
+                  FP_t,
+                  LP_t,
+                  BP_t,
+                  TP_t,
+                  FP_v_int,
+                  LP_v_int,
+                  BP_v_int,
+                  TP_v_int,
+                  ChunkSuit4Tri.chunkMetadata.getVersion(),
+                  ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(),
+                  statistics.getCount());
+              break;
+            case INT64:
+              long FP_v_long = ((LongStatistics) statistics).getFirstValue();
+              long LP_v_long = ((LongStatistics) statistics).getLastValue();
+              long BP_v_long = ((LongStatistics) statistics).getMinValue();
+              long TP_v_long = ((LongStatistics) statistics).getMaxValue();
+              M4_CHUNK_METADATA.debug(
+                  "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}",
+                  FP_t,
+                  LP_t,
+                  BP_t,
+                  TP_t,
+                  FP_v_long,
+                  LP_v_long,
+                  BP_v_long,
+                  TP_v_long,
+                  ChunkSuit4Tri.chunkMetadata.getVersion(),
+                  ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(),
+                  statistics.getCount());
+              break;
+            case FLOAT:
+              float FP_v_float = ((FloatStatistics) 
statistics).getFirstValue();
+              float LP_v_float = ((FloatStatistics) statistics).getLastValue();
+              float BP_v_float = ((FloatStatistics) statistics).getMinValue();
+              float TP_v_float = ((FloatStatistics) statistics).getMaxValue();
+              M4_CHUNK_METADATA.debug(
+                  "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}",
+                  FP_t,
+                  LP_t,
+                  BP_t,
+                  TP_t,
+                  FP_v_float,
+                  LP_v_float,
+                  BP_v_float,
+                  TP_v_float,
+                  ChunkSuit4Tri.chunkMetadata.getVersion(),
+                  ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(),
+                  statistics.getCount());
+              break;
+            case DOUBLE:
+              double FP_v_double = ((DoubleStatistics) 
statistics).getFirstValue();
+              double LP_v_double = ((DoubleStatistics) 
statistics).getLastValue();
+              double BP_v_double = ((DoubleStatistics) 
statistics).getMinValue();
+              double TP_v_double = ((DoubleStatistics) 
statistics).getMaxValue();
+              M4_CHUNK_METADATA.debug(
+                  "M4_CHUNK_METADATA,{},{},{},{},{},{},{},{},{},{},{}",
+                  FP_t,
+                  LP_t,
+                  BP_t,
+                  TP_t,
+                  FP_v_double,
+                  LP_v_double,
+                  BP_v_double,
+                  TP_v_double,
+                  ChunkSuit4Tri.chunkMetadata.getVersion(),
+                  ChunkSuit4Tri.chunkMetadata.getOffsetOfChunkHeader(),
+                  statistics.getCount());
+              break;
+            default:
+              throw new QueryProcessException("unsupported data type!");
+          }
+        }
+      }
+
+    } catch (IOException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+
+    //    IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, 
System.nanoTime() -
+    // start);
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    results.add(aggrResult);
+  }
+
+  private void getCurrentChunkListFromFutureChunkList(long curStartTime, long 
curEndTime) {
+    //    IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN;
+
+    // empty currentChunkList
+    currentChunkList = new ArrayList<>();
+
+    // iterate futureChunkList
+    ListIterator<ChunkSuit4Tri> itr = futureChunkList.listIterator();
+    while (itr.hasNext()) {
+      ChunkSuit4Tri chunkSuit4Tri = itr.next();
+      ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata;
+      long chunkMinTime = chunkMetadata.getStartTime();
+      long chunkMaxTime = chunkMetadata.getEndTime();
+      if (chunkMaxTime < curStartTime) {
+        // the chunk falls on the left side of the current M4 interval Ii
+        itr.remove();
+      } else if (chunkMinTime >= curEndTime) {
+        // the chunk falls on the right side of the current M4 interval Ii,
+        // and since futureChunkList is ordered by the startTime of 
chunkMetadata,
+        // the loop can be terminated early.
+        break;
+      } else if (chunkMaxTime < curEndTime) {
+        // this chunk is not related to buckets later
+        currentChunkList.add(chunkSuit4Tri);
+        itr.remove();
+      } else {
+        // this chunk is overlapped with the right border of the current bucket
+        currentChunkList.add(chunkSuit4Tri);
+        // still keep it in the futureChunkList
+      }
+    }
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(
+      long curStartTime, long curEndTime, long startTime, long endTime, long 
interval)
+      throws IOException {
+    StringBuilder series = new StringBuilder();
+
+    // clear result cache
+    for (AggregateResult result : results) {
+      result.reset();
+    }
+
+    
series.append(CONFIG.getP1v()).append("[").append(CONFIG.getP1t()).append("]").append(",");
+
+    // Assume no empty buckets
+    for (int b = 0; b < N1; b++) {
+      long localCurStartTime = startTime + (b) * interval;
+      long localCurEndTime = startTime + (b + 1) * interval;
+
+      getCurrentChunkListFromFutureChunkList(localCurStartTime, 
localCurEndTime);
+
+      if (currentChunkList.size() == 0) {
+        // 
minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime]
+        series.append("null[null],null[null],null[null],null[null],");
+        continue;
+      }
+
+      calculateFirst(currentChunkList, localCurStartTime, localCurEndTime, 
series);
+    }
+
+    
series.append(CONFIG.getPnv()).append("[").append(CONFIG.getPnt()).append("]").append(",");
+
+    MinValueAggrResult minValueAggrResult = (MinValueAggrResult) 
results.get(0);
+    minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0));
+
+    return results;
+  }
+
+  private void calculateFirst(
+      List<ChunkSuit4Tri> currentChunkList,
+      long curStartTime,
+      long curEndTime,
+      StringBuilder series)
+      throws IOException {
+    long firstTime = -1;
+    double firstValue = 0;
+
+    for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) {
+
+      Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics();
+
+      if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) {
+        // update FP
+        if (firstTime < 0) {
+          firstTime = statistics.getStartTime();
+          firstValue = (double) statistics.getFirstValue();
+        }
+      } else { // cannot use statistics directly
+        // 1. load page data if it hasn't been loaded
+        TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType();
+        if (dataType != TSDataType.DOUBLE) {
+          throw new UnSupportedDataTypeException(String.valueOf(dataType));
+        }
+        if (chunkSuit4Tri.pageReader == null) {
+          chunkSuit4Tri.pageReader =
+              
FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, 
this.timeFilter);
+          //  ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A 
CHUNK,
+          //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+          //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+          //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS
+          //  ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS!
+        }
+
+        int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount();
+        PageReader pageReader = chunkSuit4Tri.pageReader;
+        int i;
+        for (i = chunkSuit4Tri.lastReadPos; i < count; i++) {
+          IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++;
+          long timestamp = pageReader.timeBuffer.getLong(i * 8);
+          if (timestamp < curStartTime) {
+            // 2. read from lastReadPos until the first point fallen within 
this bucket (if it
+            // exists)
+            continue;
+          } else if (timestamp >= curEndTime) {
+            // 3. traverse until the first point fallen right this bucket, 
also remember to update
+            // lastReadPos
+            chunkSuit4Tri.lastReadPos = i;
+            break;
+          } else {
+            // 4. update MinMax by traversing points fallen within this bucket
+            ByteBuffer valueBuffer = pageReader.valueBuffer;
+            double v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 
8);
+            if (firstTime < 0) {
+              firstTime = timestamp;
+              firstValue = v;
+              break;
+            }
+          }
+        }
+      }
+    }
+    if (firstTime >= 0) {
+      // 
minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime]
+      
series.append(firstValue).append("[").append(firstTime).append("]").append(",");
+    } else {
+      // empty bucket although statistics cover
+      // 
minValue[bottomTime],maxValue[topTime],firstValue[firstTime],lastValue[lastTime]
+      series.append("null[null],");
+    }
+  }
+
+  public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long 
curStartTime, long curEndTime) {
+    return false;
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long 
nextEndTime)
+      throws IOException {
+    throw new IOException("no implemented");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new IOException("no implemented");
+  }
+}

Reply via email to