This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/LTS-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9cb402f77e4b799826a426a5f13582a97054ebe3
Author: Lei Rui <[email protected]>
AuthorDate: Sun Jul 14 15:09:55 2024 +0800

    add
---
 .../resources/conf/iotdb-engine.properties         |   4 +-
 .../groupby/GroupByWithoutValueFilterDataSet.java  |   3 +
 .../groupby/LocalGroupByExecutorTri_FSW.java       | 163 +++++++++++++++++++++
 .../org/apache/iotdb/db/query/simpiece/FSW.java    |  98 +++++++++++++
 .../iotdb/db/query/simpiece/MySample_fsw.java      |  78 ++++++++++
 5 files changed, 344 insertions(+), 2 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties 
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 18bcfd54fa5..7cc6cfcc97a 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -19,10 +19,10 @@
 ####################
 ### enable Tri
 ####################
-# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece, SC
+# MinMax, MinMaxLTTB, M4, LTTB, ILTS, SimPiece, SC, FSW
 enable_Tri=MinMax
 
-# SimPiece segment error threshold
+# segment error threshold for SimPiece, SC, FSW
 epsilon=100
 
 #for
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 6a4f559c775..5818d4b6a04 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -441,6 +441,9 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
     } else if (CONFIG.getEnableTri().equals("SC")) {
       return new LocalGroupByExecutorTri_SC(
           path, allSensors, dataType, context, timeFilter, fileFilter, 
ascending);
+    } else if (CONFIG.getEnableTri().equals("FSW")) {
+      return new LocalGroupByExecutorTri_FSW(
+          path, allSensors, dataType, context, timeFilter, fileFilter, 
ascending);
     } else {
       logger.info("No matched enable_tri!");
       return new LocalGroupByExecutor(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_FSW.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_FSW.java
new file mode 100644
index 00000000000..699f88dd824
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_FSW.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.impl.MinValueAggrResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.SeriesReader;
+import org.apache.iotdb.db.query.simpiece.FSW;
+import org.apache.iotdb.db.query.simpiece.Point;
+import org.apache.iotdb.db.query.simpiece.TimeSeries;
+import org.apache.iotdb.db.query.simpiece.TimeSeriesReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo;
+import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+
+public class LocalGroupByExecutorTri_FSW implements GroupByExecutor {
+
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
+  // Aggregate result buffer of this path
+  private final List<AggregateResult> results = new ArrayList<>();
+
+  TimeSeries timeSeries;
+
+  double epsilon = CONFIG.getEpsilon();
+
+  public LocalGroupByExecutorTri_FSW(
+      PartialPath path,
+      Set<String> allSensors,
+      TSDataType dataType,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    //    long start = System.nanoTime();
+
+    // get all data sources
+    QueryDataSource queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, 
timeFilter);
+
+    // update filter by TTL
+    //    this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    SeriesReader seriesReader =
+        new SeriesReader(
+            path,
+            allSensors,
+            // fix bug: here use the aggregation type as the series data type,
+            // not using pageReader.getAllSatisfiedPageData is ok
+            dataType,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    try {
+      // : this might be bad to load all chunk metadata at first
+      List<ChunkSuit4Tri> futureChunkList = new ArrayList<>();
+      futureChunkList.addAll(seriesReader.getAllChunkMetadatas4Tri());
+      // order futureChunkList by chunk startTime
+      futureChunkList.sort(
+          new Comparator<ChunkSuit4Tri>() {
+            public int compare(ChunkSuit4Tri o1, ChunkSuit4Tri o2) {
+              return ((Comparable) (o1.chunkMetadata.getStartTime()))
+                  .compareTo(o2.chunkMetadata.getStartTime());
+            }
+          });
+
+      GroupByFilter groupByFilter = (GroupByFilter) timeFilter;
+      long startTime = groupByFilter.getStartTime();
+      long endTime = groupByFilter.getEndTime();
+
+      timeSeries = TimeSeriesReader.getTimeSeriesFromTsFiles(futureChunkList, 
startTime, endTime);
+
+    } catch (IOException e) {
+      throw new QueryProcessException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    results.add(aggrResult);
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(
+      long curStartTime, long curEndTime, long startTime, long endTime, long 
interval)
+      throws IOException {
+    // group by curStartTime and curEndTime are not used in Sim-Piece 
segmentation
+
+    StringBuilder series = new StringBuilder();
+
+    // clear result cache
+    for (AggregateResult result : results) {
+      result.reset();
+    }
+
+    // shrinking cone
+    int length = timeSeries.length();
+    List<Point> points = timeSeries.data;
+
+    // Shrinking cone
+    List<Point> reducedPoints = FSW.reducePoints(timeSeries.data, epsilon);
+
+    for (Point p : reducedPoints) {
+      
series.append(p.getValue()).append("[").append(p.getTimestamp()).append("]").append(",");
+    }
+
+    MinValueAggrResult minValueAggrResult = (MinValueAggrResult) 
results.get(0);
+    minValueAggrResult.updateResult(new MinMaxInfo<>(series.toString(), 0));
+    return results;
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long 
nextEndTime)
+      throws IOException {
+    throw new IOException("no implemented");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new IOException("no implemented");
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/simpiece/FSW.java 
b/server/src/main/java/org/apache/iotdb/db/query/simpiece/FSW.java
new file mode 100644
index 00000000000..4b2e0163e1a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/FSW.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.simpiece;
+
+import java.util.ArrayList;
+import java.util.List;
+
+// Liu, Xiaoyan, Lin, Zhenjiang, Wang, Huaiqing, 2008. Novel online methods 
for time series
+// segmentation.
+// FSW: feasible space window
+// adapted from https://github.com/zhourongleiden/AEPLA.git
+public class FSW {
+
+  public static List<Point> reducePoints(List<Point> points, double epsilon) {
+    int length = points.size();
+
+    // Precompute upper and lower bounds
+    double[] p_upper = new double[length];
+    double[] p_lower = new double[length];
+    for (int j = 0; j < length; j++) {
+      double v = points.get(j).getValue();
+      p_upper[j] = v + epsilon;
+      p_lower[j] = v - epsilon;
+    }
+
+    // init the first segment
+    int i = 0;
+    int seg_no = 0;
+    int csp_id = 0;
+    List<int[]> segmentPoint = new ArrayList<>();
+    segmentPoint.add(new int[] {0, 0});
+    double upSlope = Double.POSITIVE_INFINITY;
+    double lowSlope = Double.NEGATIVE_INFINITY;
+
+    while (i < length - 1) {
+      i++;
+      upSlope =
+          Math.min(
+              upSlope,
+              ((p_upper[i] - 
points.get(segmentPoint.get(seg_no)[1]).getValue())
+                  / (points.get(i).getTimestamp()
+                      - 
points.get(segmentPoint.get(seg_no)[1]).getTimestamp())));
+      //              (i - segmentPoint.get(seg_no)[1])));
+      lowSlope =
+          Math.max(
+              lowSlope,
+              ((p_lower[i] - 
points.get(segmentPoint.get(seg_no)[1]).getValue())
+                  / (points.get(i).getTimestamp()
+                      - 
points.get(segmentPoint.get(seg_no)[1]).getTimestamp())));
+      //              (i - segmentPoint.get(seg_no)[1])));
+      if (upSlope < lowSlope) {
+        seg_no += 1;
+        segmentPoint.add(new int[] {seg_no, csp_id});
+        i = csp_id;
+        upSlope = Double.POSITIVE_INFINITY;
+        lowSlope = Double.NEGATIVE_INFINITY;
+      } else {
+        double s =
+            (points.get(i).getValue() - 
points.get(segmentPoint.get(seg_no)[1]).getValue())
+                / (points.get(i).getTimestamp()
+                    - points.get(segmentPoint.get(seg_no)[1]).getTimestamp());
+        //                (i - segmentPoint.get(seg_no)[1]);
+        if (s >= lowSlope && s <= upSlope) {
+          csp_id = i;
+        }
+      }
+    }
+
+    // deal with the last segment
+    if (segmentPoint.get(seg_no)[1] < length - 1) {
+      seg_no += 1;
+      segmentPoint.add(new int[] {seg_no, length - 1});
+    }
+
+    List<Point> result = new ArrayList<>();
+    for (int[] ints : segmentPoint) {
+      result.add(points.get(ints[1]));
+    }
+    return result;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample_fsw.java 
b/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample_fsw.java
new file mode 100644
index 00000000000..854962e96a6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/simpiece/MySample_fsw.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Sim-Piece code forked from https://github.com/xkitsios/Sim-Piece.git
+
+package org.apache.iotdb.db.query.simpiece;
+
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.List;
+
+public class MySample_fsw {
+
+  public static void main(String[] args) {
+    String fileDir = "D:\\desktop\\NISTPV\\";
+    boolean series = true; // 从1开始编号列而不是时间戳列
+    String[] datasetNameList =
+        new String[] {
+          //                      "NISTPV-Ground-2015-Qloss_Ah",
+          "NISTPV-Ground-2015-Pyra1_Wm2",
+          //          "NISTPV-Ground-2015-RTD_C_3"
+        };
+
+    int[] noutList = new int[] {0};
+
+    double[] r = new double[] {0.1, 0.5, 1.3};
+    for (int y = 0; y < datasetNameList.length; y++) {
+      String datasetName = datasetNameList[y];
+      //      int start = (int) (10000000 / 2 - 2500000 * r[y]); // 从0开始计数
+      //      int end = (int) (10000000 / 2 + 2500000 * (1 - r[y]));
+      //      int N = end - start; // -1 for all
+
+      int start = 0;
+      int end = 10000;
+      int N = end - start;
+
+      for (int nout : noutList) {
+        // apply Sim-Piece on the input file, outputting nout points saved in 
csvFile
+        boolean hasHeader = false;
+        try (FileInputStream inputStream = new FileInputStream(fileDir + 
datasetName + ".csv")) {
+          String delimiter = ",";
+          TimeSeries ts =
+              TimeSeriesReader.getMyTimeSeries(
+                  inputStream, delimiter, false, N, start, hasHeader, series);
+
+          double epsilon = 0.1;
+          List<Point> reducedPoints = FSW.reducePoints(ts.data, epsilon);
+          try (PrintWriter writer =
+              new PrintWriter(
+                  new FileWriter(datasetName + "-" + N + "-" + 
reducedPoints.size() + "-sc.csv"))) {
+            for (Point p : reducedPoints) {
+              writer.println(p.getTimestamp() + "," + p.getValue());
+            }
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+}

Reply via email to