This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/LTS-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5b4209916ce0495c2b5650c00e8e8713a4702fc1
Author: Lei Rui <[email protected]>
AuthorDate: Sun Jan 28 15:23:46 2024 +0800

    add iteration
---
 .../groupby/GroupByWithoutValueFilterDataSet.java  |  12 +-
 .../groupby/LocalGroupByExecutorTri_ILTS.java      | 316 +++++++++++++++++++++
 .../groupby/LocalGroupByExecutorTri_LTTB.java      | 151 +---------
 .../iotdb/db/integration/tri/MyTest_ILTS.java      | 248 ++++++++++++++++
 4 files changed, 569 insertions(+), 158 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 913158a5021..c349d9dc3c9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -142,14 +142,9 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
     }
     //    } else if (CONFIG.getEnableTri().equals("M4LTTB")) {
     //      // TODO
-    else if (CONFIG.getEnableTri().equals("LTTB")) {
-      // TODO
+    else if (CONFIG.getEnableTri().equals("LTTB") || 
CONFIG.getEnableTri().equals("ILTS")) {
       return nextWithoutConstraintTri_LTTB();
-    }
-    //    } else if (CONFIG.getEnableTri().equals("ILTS")) {
-    //      // TODO
-    //    }
-    else {
+    } else {
       return nextWithoutConstraint_raw();
     }
   }
@@ -494,12 +489,11 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
       return new LocalGroupByExecutor(
           path, allSensors, dataType, context, timeFilter, fileFilter, 
ascending);
     } else if (CONFIG.getEnableTri().equals("LTTB")) {
-      // TODO
       return new LocalGroupByExecutorTri_LTTB(
           path, allSensors, dataType, context, timeFilter, fileFilter, 
ascending);
     } else if (CONFIG.getEnableTri().equals("ILTS")) {
       // TODO
-      return new LocalGroupByExecutor(
+      return new LocalGroupByExecutorTri_ILTS(
           path, allSensors, dataType, context, timeFilter, fileFilter, 
ascending);
     }
     // deprecated below
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java
new file mode 100644
index 00000000000..73f9fd395d7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesReader;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo;
+import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri;
+import org.apache.iotdb.tsfile.read.common.IOMonitor2;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor {
+
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger M4_CHUNK_METADATA = 
LoggerFactory.getLogger("M4_CHUNK_METADATA");
+
+  // Aggregate result buffer of this path
+  private final List<AggregateResult> results = new ArrayList<>();
+
+  //  private List<ChunkSuit4Tri> currentChunkList;
+
+  //  private final List<ChunkSuit4Tri> futureChunkList = new ArrayList<>();
+
+  // keys: 0,1,...,(int) Math.floor((endTime * 1.0 - startTime) / interval)-1
+  private final Map<Integer, List<ChunkSuit4Tri>> splitChunkList = new 
HashMap<>();
+
+  private final long p1t = CONFIG.getP1t();
+  private final double p1v = CONFIG.getP1v();
+  private final long pnt = CONFIG.getPnt();
+  private final double pnv = CONFIG.getPnv();
+
+  private long lt = p1t;
+  private double lv = p1v;
+
+  private final int N1; // 分桶数
+
+  private static final int numIterations = 8;
+
+  private Filter timeFilter;
+
+  public LocalGroupByExecutorTri_ILTS(
+      PartialPath path,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    //    long start = System.nanoTime();
+
+    // get all data sources
+    QueryDataSource queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, 
this.timeFilter);
+
+    // update filter by TTL
+    this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    SeriesReader seriesReader =
+        new SeriesReader(
+            path,
+            allSensors,
+            // fix bug: here use the aggregation type as the series data type,
+            // not using pageReader.getAllSatisfiedPageData is ok
+            dataType,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    // unpackAllOverlappedFilesToTimeSeriesMetadata
+    try {
+      // : this might be bad to load all chunk metadata at first
+      List<ChunkSuit4Tri> futureChunkList =
+          new ArrayList<>(seriesReader.getAllChunkMetadatas4Tri()); // no need 
sort here
+      // arrange futureChunkList into each bucket
+      GroupByFilter groupByFilter = (GroupByFilter) timeFilter;
+      long startTime = groupByFilter.getStartTime();
+      long endTime = groupByFilter.getEndTime();
+      long interval = groupByFilter.getInterval();
+      N1 = (int) Math.floor((endTime * 1.0 - startTime) / interval); // 分桶数
+      for (ChunkSuit4Tri chunkSuit4Tri : futureChunkList) {
+        ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata;
+        long chunkMinTime = chunkMetadata.getStartTime();
+        long chunkMaxTime = chunkMetadata.getEndTime();
+        int idx1 = (int) Math.floor((chunkMinTime - startTime) * 1.0 / 
interval);
+        int idx2 = (int) Math.floor((chunkMaxTime - startTime) * 1.0 / 
interval);
+        idx2 = (int) Math.min(idx2, N1 - 1);
+        for (int i = idx1; i <= idx2; i++) {
+          splitChunkList.computeIfAbsent(i, k -> new ArrayList<>());
+          splitChunkList.get(i).add(chunkSuit4Tri);
+        }
+      }
+
+    } catch (IOException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+
+    //    IOMonitor2.addMeasure(Operation.M4_LSM_INIT_LOAD_ALL_CHUNKMETADATAS, 
System.nanoTime() -
+    // start);
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    results.add(aggrResult);
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(
+      long curStartTime, long curEndTime, long startTime, long endTime, long 
interval)
+      throws IOException {
+    // 这里用calcResult一次返回所有buckets结果(可以把MinValueAggrResult的value设为string类型,
+    // 那就把所有buckets结果作为一个string返回。这样的话返回的[t]是没有意义的,只取valueString)
+    // 而不是像MinMax那样在nextWithoutConstraintTri_MinMax()里调用calcResult每次计算一个bucket
+    StringBuilder series_final = new StringBuilder();
+
+    // clear result cache
+    for (AggregateResult result : results) {
+      result.reset();
+    }
+
+    long[] lastIter_t = new long[N1]; // N1不包括全局首尾点
+    double[] lastIter_v = new double[N1]; // N1不包括全局首尾点
+    for (int num = 0; num < numIterations; num++) {
+      StringBuilder series = new StringBuilder();
+      // 全局首点
+      series.append(p1v).append("[").append(p1t).append("]").append(",");
+      // 遍历分桶 Assume no empty buckets
+      for (int b = 0; b < N1; b++) {
+        long rt = 0; // must initialize as zero, because may be used as sum 
for average
+        double rv = 0; // must initialize as zero, because may be used as sum 
for average
+        // 计算右边桶的固定点
+        if (b == N1 - 1) { // 最后一个桶
+          // 全局尾点
+          rt = pnt;
+          rv = pnv;
+        } else { // 不是最后一个桶
+          if (num == 0) { // 是第一次迭代的话,就使用右边桶的平均点
+            // ========计算右边桶的平均点========
+            List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b + 1);
+            long rightStartTime = startTime + (b + 1) * interval;
+            long rightEndTime = startTime + (b + 2) * interval;
+            int cnt = 0;
+            // 遍历所有与右边桶overlap的chunks
+            for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) {
+              TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType();
+              if (dataType != TSDataType.DOUBLE) {
+                throw new 
UnSupportedDataTypeException(String.valueOf(dataType));
+              }
+              // 1. load page data if it hasn't been loaded
+              if (chunkSuit4Tri.pageReader == null) {
+                chunkSuit4Tri.pageReader =
+                    FileLoaderUtils.loadPageReaderList4CPV(
+                        chunkSuit4Tri.chunkMetadata, this.timeFilter);
+                //  ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE 
IN A CHUNK,
+                //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS 
ASSUMPTION.
+                //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK 
WHILE
+                //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE 
OF STEPREGRESS IS
+                //  ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS!
+              }
+              // 2. 计算平均点
+              PageReader pageReader = chunkSuit4Tri.pageReader;
+              for (int j = 0; j < 
chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) {
+                long timestamp = pageReader.timeBuffer.getLong(j * 8);
+                if (timestamp < rightStartTime) {
+                  continue;
+                } else if (timestamp >= rightEndTime) {
+                  break;
+                } else { // rightStartTime<=t<rightEndTime
+                  ByteBuffer valueBuffer = pageReader.valueBuffer;
+                  double v = valueBuffer.getDouble(pageReader.timeBufferLength 
+ j * 8);
+                  rt += timestamp;
+                  rv += v;
+                  cnt++;
+                }
+              }
+            }
+            rt = rt / cnt;
+            rv = rv / cnt;
+          } else { // 不是第一次迭代也不是最后一个桶的话,就使用上一轮迭代右边桶的采样点
+            rt = lastIter_t[b + 1];
+            rv = lastIter_v[b + 1];
+          }
+        }
+        // ========找到当前桶内距离lr连线最远的点========
+        double maxArea = -1;
+        long select_t = -1;
+        double select_v = -1;
+        List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(b);
+        long localCurStartTime = startTime + (b) * interval;
+        long localCurEndTime = startTime + (b + 1) * interval;
+        // 遍历所有与当前桶overlap的chunks
+        for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) {
+          TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType();
+          if (dataType != TSDataType.DOUBLE) {
+            throw new UnSupportedDataTypeException(String.valueOf(dataType));
+          }
+          // load page data if it hasn't been loaded
+          if (chunkSuit4Tri.pageReader == null) {
+            chunkSuit4Tri.pageReader =
+                FileLoaderUtils.loadPageReaderList4CPV(
+                    chunkSuit4Tri.chunkMetadata, this.timeFilter);
+            //  ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A 
CHUNK,
+            //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
+            //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
+            //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS
+            //  ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS!
+          }
+          PageReader pageReader = chunkSuit4Tri.pageReader;
+          for (int j = 0; j < 
chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); j++) {
+            long timestamp = pageReader.timeBuffer.getLong(j * 8);
+            if (timestamp < localCurStartTime) {
+              continue;
+            } else if (timestamp >= localCurEndTime) {
+              break;
+            } else { // localCurStartTime<=t<localCurEndTime
+              ByteBuffer valueBuffer = pageReader.valueBuffer;
+              double v = valueBuffer.getDouble(pageReader.timeBufferLength + j 
* 8);
+              double area = IOMonitor2.calculateTri(lt, lv, timestamp, v, rt, 
rv);
+              if (area > maxArea) {
+                maxArea = area;
+                select_t = timestamp;
+                select_v = v;
+              }
+            }
+          }
+        }
+        // 记录结果
+        
series.append(select_v).append("[").append(select_t).append("]").append(",");
+
+        // 更新lt,lv
+        // 下一个桶自然地以select_t, select_v作为左桶固定点
+        lt = select_t;
+        lv = select_v;
+        // 记录本轮迭代本桶选点
+        lastIter_t[b] = select_t;
+        lastIter_v[b] = select_v;
+      } // 遍历分桶结束
+      // 全局尾点
+      series.append(pnv).append("[").append(pnt).append("]").append(",");
+
+      System.out.println(series);
+    } // end Iterations
+
+    // 全局首点
+    series_final.append(p1v).append("[").append(p1t).append("]").append(",");
+    for (int i = 0; i < lastIter_t.length; i++) {
+      
series_final.append(lastIter_v[i]).append("[").append(lastIter_t[i]).append("]").append(",");
+    }
+    // 全局尾点
+    series_final.append(pnv).append("[").append(pnt).append("]").append(",");
+    MinValueAggrResult minValueAggrResult = (MinValueAggrResult) 
results.get(0);
+    minValueAggrResult.updateResult(new MinMaxInfo<>(series_final.toString(), 
0));
+
+    return results;
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long 
nextEndTime)
+      throws IOException {
+    throw new IOException("no implemented");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new IOException("no implemented");
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java
index 9ec058e2059..be84be70c38 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_LTTB.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
-import org.apache.iotdb.db.query.aggregation.impl.MaxValueAggrResult;
 import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -37,7 +36,6 @@ import 
org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri;
 import org.apache.iotdb.tsfile.read.common.IOMonitor2;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
@@ -78,7 +76,6 @@ public class LocalGroupByExecutorTri_LTTB implements 
GroupByExecutor {
 
   private long lt = p1t;
   private double lv = p1v;
-  //  private boolean[] notEmptyBucket;
 
   private final int N1; // 分桶数
 
@@ -119,8 +116,8 @@ public class LocalGroupByExecutorTri_LTTB implements 
GroupByExecutor {
     // unpackAllOverlappedFilesToTimeSeriesMetadata
     try {
       // : this might be bad to load all chunk metadata at first
-      List<ChunkSuit4Tri> futureChunkList = new ArrayList<>();
-      futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri()); // no 
need sort here
+      List<ChunkSuit4Tri> futureChunkList =
+          new ArrayList<>(seriesReader.getAllChunkMetadatas4Tri()); // no need 
sort here
       // arrange futureChunkList into each bucket
       GroupByFilter groupByFilter = (GroupByFilter) timeFilter;
       long startTime = groupByFilter.getStartTime();
@@ -153,39 +150,6 @@ public class LocalGroupByExecutorTri_LTTB implements 
GroupByExecutor {
     results.add(aggrResult);
   }
 
-  //  private void getCurrentChunkListFromFutureChunkList(long curStartTime, 
long curEndTime) {
-  //    //    IOMonitor2.M4_LSM_status = Operation.M4_LSM_MERGE_M4_TIME_SPAN;
-  //
-  //    // empty currentChunkList
-  //    currentChunkList = new ArrayList<>();
-  //
-  //    // iterate futureChunkList
-  //    ListIterator<ChunkSuit4Tri> itr = futureChunkList.listIterator();
-  //    while (itr.hasNext()) {
-  //      ChunkSuit4Tri chunkSuit4Tri = itr.next();
-  //      ChunkMetadata chunkMetadata = chunkSuit4Tri.chunkMetadata;
-  //      long chunkMinTime = chunkMetadata.getStartTime();
-  //      long chunkMaxTime = chunkMetadata.getEndTime();
-  //      if (chunkMaxTime < curStartTime) {
-  //        // the chunk falls on the left side of the current M4 interval Ii
-  //        itr.remove();
-  //      } else if (chunkMinTime >= curEndTime) {
-  //        // the chunk falls on the right side of the current M4 interval Ii,
-  //        // and since futureChunkList is ordered by the startTime of 
chunkMetadata,
-  //        // the loop can be terminated early.
-  //        break;
-  //      } else if (chunkMaxTime < curEndTime) {
-  //        // this chunk is not related to buckets later
-  //        currentChunkList.add(chunkSuit4Tri);
-  //        itr.remove();
-  //      } else {
-  //        // this chunk is overlapped with the right border of the current 
bucket
-  //        currentChunkList.add(chunkSuit4Tri);
-  //        // still keep it in the futureChunkList
-  //      }
-  //    }
-  //  }
-
   @Override
   public List<AggregateResult> calcResult(
       long curStartTime, long curEndTime, long startTime, long endTime, long 
interval)
@@ -313,120 +277,9 @@ public class LocalGroupByExecutorTri_LTTB implements 
GroupByExecutor {
     MinValueAggrResult minValueAggrResult = (MinValueAggrResult) 
results.get(0);
     minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0));
 
-    //    int idx = (int) Math.floor((curStartTime - startTime) * 1.0 / 
interval);
-    //    System.out.println("idx=" + idx);
-    //    List<ChunkSuit4Tri> chunkSuit4TriList = splitChunkList.get(idx);
-    //    if (chunkSuit4TriList != null) {
-    //      for (ChunkSuit4Tri chunkSuit4Tri : chunkSuit4TriList) {
-    //        System.out.println(chunkSuit4Tri.chunkMetadata.getStartTime());
-    //      }
-    //    }
-
-    //    long start = System.nanoTime();
-    //    getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime);
-    //    IOMonitor2.addMeasure(Operation.M4_LSM_MERGE_M4_TIME_SPAN, 
System.nanoTime() - start);
-
-    //    if (currentChunkList.size() == 0) {
-    //      return results;
-    //    }
-
-    //    start = System.nanoTime();
-    //    calculateMinMax(currentChunkList, curStartTime, curEndTime);
-    //    IOMonitor2.addMeasure(Operation.M4_LSM_FP, System.nanoTime() - 
start);
-
     return results;
   }
 
-  private void calculateMinMax(
-      List<ChunkSuit4Tri> currentChunkList, long curStartTime, long 
curEndTime) throws IOException {
-    for (ChunkSuit4Tri chunkSuit4Tri : currentChunkList) {
-
-      Statistics statistics = chunkSuit4Tri.chunkMetadata.getStatistics();
-
-      if (canUseStatistics(chunkSuit4Tri, curStartTime, curEndTime)) {
-        // update BP
-        MinValueAggrResult minValueAggrResult = (MinValueAggrResult) 
results.get(0);
-        minValueAggrResult.updateResult(
-            new MinMaxInfo<>(statistics.getMinValue(), 
statistics.getBottomTimestamp()));
-        // update TP
-        MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) 
results.get(1);
-        maxValueAggrResult.updateResult(
-            new MinMaxInfo<>(statistics.getMaxValue(), 
statistics.getTopTimestamp()));
-      } else { // cannot use statistics directly
-
-        Comparable<Object> minVal = null;
-        long bottomTime = -1;
-        Comparable<Object> maxVal = null;
-        long topTime = -1;
-
-        // 1. load page data if it hasn't been loaded
-        if (chunkSuit4Tri.pageReader == null) {
-          chunkSuit4Tri.pageReader =
-              
FileLoaderUtils.loadPageReaderList4CPV(chunkSuit4Tri.chunkMetadata, 
this.timeFilter);
-          //  ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A 
CHUNK,
-          //  BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION.
-          //  OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE
-          //  STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF 
STEPREGRESS IS
-          //  ASSIGN DIRECTLY), WHICH WILL INTRODUCE BUGS!
-        }
-
-        int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount();
-        PageReader pageReader = chunkSuit4Tri.pageReader;
-        for (int i = chunkSuit4Tri.lastReadPos; i < count; i++) {
-          long timestamp = pageReader.timeBuffer.getLong(i * 8);
-          if (timestamp < curStartTime) {
-            // 2. read from lastReadPos until the first point fallen within 
this bucket (if it
-            // exists)
-            continue;
-          } else if (timestamp >= curEndTime) {
-            // 3. traverse until the first point fallen right this bucket, 
also remember to update
-            // lastReadPos
-            chunkSuit4Tri.lastReadPos = i;
-            break;
-          } else {
-            // 4. update MinMax by traversing points fallen within this bucket
-            ByteBuffer valueBuffer = pageReader.valueBuffer;
-            TSDataType dataType = chunkSuit4Tri.chunkMetadata.getDataType();
-            Object v;
-            switch (dataType) {
-                //              case INT64:
-                //                v = 
valueBuffer.getLong(pageReader.timeBufferLength + i * 8);
-                //                break;
-              case DOUBLE:
-                v = valueBuffer.getDouble(pageReader.timeBufferLength + i * 8);
-                break;
-              default:
-                throw new 
UnSupportedDataTypeException(String.valueOf(dataType));
-            }
-            if (minVal == null || minVal.compareTo(v) > 0) {
-              minVal = (Comparable<Object>) v;
-              bottomTime = timestamp;
-            }
-            if (maxVal == null || maxVal.compareTo(v) < 0) {
-              maxVal = (Comparable<Object>) v;
-              topTime = timestamp;
-            }
-          }
-        }
-        // 4. update MinMax by traversing points fallen within this bucket
-        if (minVal != null) {
-          // update BP
-          MinValueAggrResult minValueAggrResult = (MinValueAggrResult) 
results.get(0);
-          minValueAggrResult.updateResult(new MinMaxInfo<>(minVal, 
bottomTime));
-          // update TP
-          MaxValueAggrResult maxValueAggrResult = (MaxValueAggrResult) 
results.get(1);
-          maxValueAggrResult.updateResult(new MinMaxInfo<>(maxVal, topTime));
-        }
-      }
-    }
-  }
-
-  public boolean canUseStatistics(ChunkSuit4Tri chunkSuit4Tri, long 
curStartTime, long curEndTime) {
-    long TP_t = chunkSuit4Tri.chunkMetadata.getStatistics().getTopTimestamp();
-    long BP_t = 
chunkSuit4Tri.chunkMetadata.getStatistics().getBottomTimestamp();
-    return TP_t >= curStartTime && TP_t < curEndTime && BP_t >= curStartTime 
&& BP_t < curEndTime;
-  }
-
   @Override
   public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long 
nextEndTime)
       throws IOException {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java 
b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java
new file mode 100644
index 00000000000..d2381e6fa75
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.tri;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+public class MyTest_ILTS {
+
+  /*
+   * Sql format: SELECT min_value(s0), max_value(s0) ROM root.xx group by 
([tqs,tqe),IntervalLength).
+   * Requirements:
+   * (1) Don't change the sequence of the above two aggregates
+   * (2) Assume each chunk has only one page.
+   * (3) Assume all chunks are sequential and no deletes.
+   * (4) Assume plain encoding, UNCOMPRESSED, Long or Double data type, no 
compaction
+   * (5) Assume no empty bucket
+   */
+  private static final String TIMESTAMP_STR = "Time";
+
+  private static String[] creationSqls =
+      new String[] {
+        "SET STORAGE GROUP TO root.vehicle.d0",
+        "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
+        // IoTDB int data type does not support plain encoding, so use long 
data type
+      };
+
+  private final String d0s0 = "root.vehicle.d0.s0";
+
+  private static final String insertTemplate =
+      "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%f)";
+
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  @Before
+  public void setUp() throws Exception {
+    TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN");
+    config.setTimestampPrecision("ms");
+    config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+    config.setEnableTri("ILTS");
+
+    config.setEnableCPV(false);
+    TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(false);
+    TSFileDescriptor.getInstance().getConfig().setUseStatistics(false);
+
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test3_2() {
+    prepareData3_2();
+
+    String res = "5.0[1],10.0[2],2.0[40],5.0[55],20.0[62],1.0[90],7.0[102],";
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "SELECT min_value(s0)"
+                  // TODO not real min_value here, actually controlled by 
enableTri
+                  + " FROM root.vehicle.d0 group by ([2,102),20ms)");
+      // (102-2)/(7-2)=20ms
+      // note keep no empty buckets
+
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        int i = 0;
+        while (resultSet.next()) {
+          String ans = resultSet.getString(2);
+          // for LTTB all results are in the value string of MinValueAggrResult
+          // 因此对于LTTB来说,MinValueAggrResult的[t]也无意义
+          ans = ans.substring(0, ans.length() - 3);
+          System.out.println(ans);
+          Assert.assertEquals(res, ans);
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void prepareData3_2() {
+    // data:
+    // 
https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png
+    // slightly modified
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : creationSqls) {
+        statement.execute(sql);
+      }
+
+      long[] t = new long[] {1, 2, 10, 20, 22, 30, 40, 55, 60, 62, 65, 70, 72, 
80, 90, 102};
+      double[] v = new double[] {5, 10, 1, 5, 4, 8, 2, 5, 15, 20, 8, 18, 4, 
11, 1, 7};
+      config.setP1t(t[0]);
+      config.setP1v(v[0]);
+      config.setPnt(t[t.length - 1]);
+      config.setPnv(v[v.length - 1]);
+
+      for (int i = 0; i < t.length; i++) {
+        statement.execute(String.format(Locale.ENGLISH, insertTemplate, t[i], 
v[i]));
+        if ((i + 1) % 4 == 0) {
+          statement.execute("FLUSH");
+        }
+      }
+      statement.execute("FLUSH");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void test3() {
+    prepareData3();
+
+    String res =
+        
"-1.2079272[0],1.101946[200],-0.523204[300],0.145359[500],-1.014322[700],"
+            + 
"0.532565[900],-0.122525[1200],-0.676077[1300],0.809559[1500],0.315869[1800],"
+            + "-0.413534[1900],-0.0211206[2100],";
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", 
"root", "root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet =
+          statement.execute(
+              "SELECT min_value(s0)"
+                  // TODO not real min_value here, actually controlled by 
enableTri
+                  + " FROM root.vehicle.d0 group by ([100,2100),200ms)");
+      // (tn-t2)/(nout-2)=(2100-100)/(12-2)=2000/10=200
+
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        int i = 0;
+        while (resultSet.next()) {
+          String ans = resultSet.getString(2);
+          // for LTTB all results are in the value string of MinValueAggrResult
+          // 因此对于LTTB来说,MinValueAggrResult的[t]也无意义
+          ans = ans.substring(0, ans.length() - 3);
+          System.out.println(ans);
+          Assert.assertEquals(res, ans);
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  private static void prepareData3() {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : creationSqls) {
+        statement.execute(sql);
+      }
+
+      int[] t =
+          new int[] {
+            0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 
1300, 1400, 1500,
+            1600, 1700, 1800, 1900, 2000, 2100
+          };
+      double[] v =
+          new double[] {
+            -1.2079272,
+            -0.01120245,
+            1.1019456,
+            -0.52320362,
+            -0.35970289,
+            0.1453591,
+            -0.45947892,
+            -1.0143219,
+            0.81760821,
+            0.5325646,
+            -0.29532424,
+            -0.1469335,
+            -0.12252526,
+            -0.67607713,
+            -0.16967308,
+            0.8095585,
+            -0.78541944,
+            0.03221141,
+            0.31586886,
+            -0.41353356,
+            -0.21019539,
+            -0.0211206
+          };
+      config.setP1t(t[0]);
+      config.setP1v(v[0]);
+      config.setPnt(t[t.length - 1]);
+      config.setPnv(v[v.length - 1]);
+
+      for (int i = 0; i < t.length; i++) {
+        statement.execute(String.format(Locale.ENGLISH, insertTemplate, t[i], 
v[i]));
+        if ((i + 1) % 4 == 0) {
+          statement.execute("FLUSH");
+        }
+      }
+      statement.execute("FLUSH");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+}


Reply via email to