This is an automated email from the ASF dual-hosted git repository.

leirui pushed a commit to branch research/area-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f2f408ba529c373284bcb12798dc0c08605d25ee
Author: Lei Rui <[email protected]>
AuthorDate: Wed Jan 29 19:38:17 2025 +0800

    add
---
 server/sample_ltd-jar-with-dependencies.jar        | Bin 40785602 -> 40794458 
bytes
 .../java/org/apache/iotdb/db/query/eBUG/LTD.java   | 282 ++++++++++++++-------
 .../org/apache/iotdb/db/query/eBUG/LTDBucket.java  |  68 +++++
 .../eBUG/{LTD.java => LTD_slow_deprecated.java}    |  37 ++-
 4 files changed, 279 insertions(+), 108 deletions(-)

diff --git a/server/sample_ltd-jar-with-dependencies.jar 
b/server/sample_ltd-jar-with-dependencies.jar
index f4b6c621844..db15b02bb3b 100644
Binary files a/server/sample_ltd-jar-with-dependencies.jar and 
b/server/sample_ltd-jar-with-dependencies.jar differ
diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java 
b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java
index 6eb2051411c..bd91813b720 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java
@@ -1,10 +1,7 @@
 package org.apache.iotdb.db.query.eBUG;
 
 import java.io.*;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
+import java.util.*;
 
 public class LTD {
   public static Point calculateAveragePoint(List<Point> points, int 
startClosed, int endClosed) {
@@ -52,108 +49,200 @@ public class LTD {
     return sse;
   }
 
-  public static List<Integer> getLtdBinIdxs(List<Point> points, int m, int 
maxIter, boolean debug) {
-    int numOfIterations = (int) (points.size() * 1.0 / m * 10);
+  public static List<Integer> getLtdBinIdxs(List<Point> points, int m, int 
maxIter, boolean debug)
+      throws IOException {
+    if (m < 6) {
+      throw new IOException("m at least 6");
+    }
+    int n = points.size();
+    int numOfIterations;
     if (maxIter >= 0) {
-      //      numOfIterations = Math.min(numOfIterations, maxIter);
       numOfIterations = maxIter; // overwrite
+    } else {
+      numOfIterations = (int) (n * 1.0 / m * 10);
     }
-    double blockSize = (points.size() - 3) * 1.0 / (m - 2);
+    if (debug) {
+      System.out.println("numOfIterations=" + numOfIterations);
+    }
+
+    int nbins = m - 2; // 留出全局首尾点
 
-    List<Integer> offset = new LinkedList<>();
-    for (double i = 1; i < points.size(); i += blockSize) {
+    double blockSize = (n - 3) * 1.0 / nbins;
+    List<Integer> offset = new LinkedList<>(); // nbins个分桶,nbins+1个桶边界
+    for (double i = 1; i < n; i += blockSize) {
       offset.add((int) i); // 1~n-2, 这样最后一个offset+1才不会超出边界
     }
     if (debug) {
-      System.out.println("numOfIterations=" + numOfIterations);
-      System.out.println(offset);
+      System.out.println("init bins: " + offset);
     }
 
-    List<Double> sse = new LinkedList<>();
-
-    // Initialization
-    for (int i = 0; i < m - 2; i++) {
-      // with one extra point overlapping for each adjacent bucket
-      sse.add(calculateSSEForBucket(points, offset.get(i) - 1, offset.get(i + 
1) + 1));
+    LTDBucket[] buckets = new LTDBucket[nbins];
+    double lastSSE = -1;
+    for (int i = 0; i < nbins - 1; i++) {
+      double sse;
+      if (i == 0) {
+        sse = calculateSSEForBucket(points, offset.get(i) - 1, offset.get(i + 
1) + 1);
+      } else {
+        sse = lastSSE;
+      }
+      double sse_next = calculateSSEForBucket(points, offset.get(i + 1) - 1, 
offset.get(i + 2) + 1);
+      double sum = sse + sse_next;
+      buckets[i] = new LTDBucket(offset.get(i), offset.get(i + 1), sse, sum);
+      lastSSE = sse_next;
+    }
+    // the last bucket sumOf2SSE set as infinity meaning never merged
+    buckets[nbins - 1] =
+        new LTDBucket(offset.get(nbins - 1), offset.get(nbins), lastSSE, 
Double.MAX_VALUE);
+
+    // 设置前后关系
+    LTDBucket starter = new LTDBucket(0, 0, 0, 0);
+    starter.next = buckets[0];
+    for (int i = 0; i < nbins; i++) {
+      buckets[i].prev = (i == 0 ? starter : buckets[i - 1]);
+      buckets[i].next = (i == nbins - 1 ? null : buckets[i + 1]);
     }
 
+    //        System.out.println("begin creating heap...");
+
+    // 使用优先队列构建
+    PriorityQueue<LTDBucket> splitHeap =
+        new PriorityQueue<>((p1, p2) -> Double.compare(p2.sse, p1.sse));
+    // 越大的排在前面
+    Collections.addAll(splitHeap, buckets);
+    PriorityQueue<LTDBucket> mergeHeap =
+        new PriorityQueue<>(Comparator.comparingDouble(p -> p.sumOf2SSE));
+    // 越小的排在前面
+    Collections.addAll(mergeHeap, buckets);
+
+    //        System.out.println("begin iterating...");
+
     for (int c = 0; c < numOfIterations; c++) {
-      // Find the bucket to be split
-      int maxSSEIndex = -1;
-      double maxSSE = Double.NEGATIVE_INFINITY;
-      for (int i = 0; i < m - 2; i++) {
-        if (offset.get(i + 1) - offset.get(i) <= 1) {
-          continue;
-        }
-        if (sse.get(i) > maxSSE) {
-          maxSSE = sse.get(i);
-          maxSSEIndex = i;
-        }
+      if (debug) {
+        System.out.println("--------------[" + c + "]----------------");
       }
-      if (maxSSEIndex < 0) {
-        if (debug) {
-          System.out.println(c);
-          System.out.println(maxSSEIndex);
-          System.out.println("break max");
-        }
+      if (splitHeap.isEmpty() || mergeHeap.isEmpty()) {
         break;
       }
-
-      // Find the buckets to be merged
-      int minSSEIndex = -1;
-      double minSSE = Double.POSITIVE_INFINITY;
-      for (int i = 0; i < m - 3; i++) {
-        if (i == maxSSEIndex || i + 1 == maxSSEIndex) {
-          continue;
-        }
-        if (sse.get(i) + sse.get(i + 1) < minSSE) {
-          minSSE = sse.get(i) + sse.get(i + 1);
-          minSSEIndex = i;
+      // Find the bucket to be split and buckets to be merged
+      LTDBucket bucket_split = splitHeap.poll();
+      if (bucket_split.isDeleted) {
+        c--;
+        continue;
+      }
+      LTDBucket buckets_merged = mergeHeap.poll();
+      if (buckets_merged.isDeleted) {
+        c--;
+        splitHeap.add(bucket_split); // 前面poll出来的未被删除的要加回去!
+        continue;
+      }
+      // TODO
+      if (bucket_split.startIdx == buckets_merged.startIdx) {
+        if (buckets_merged.next.next != null) {
+          buckets_merged = buckets_merged.next;
+        } else {
+          buckets_merged = buckets_merged.prev;
         }
       }
-      if (minSSEIndex < 0) {
-        if (debug) {
-          System.out.println(c);
-          System.out.println(minSSEIndex);
-          System.out.println("break min");
+      if (bucket_split.endIdx == buckets_merged.getMergedEndIdx()) {
+        if (buckets_merged.prev != starter) {
+          buckets_merged = buckets_merged.prev;
+        } else { // 假设nbins至少>=4, 于是buckets_merged.next.next不为null
+          buckets_merged = buckets_merged.next.next;
         }
-        break;
       }
 
-      // Split
-      int startIdx = offset.get(maxSSEIndex);
-      int endIdx = offset.get(maxSSEIndex + 1);
-      int middleIdx = (startIdx + endIdx) / 2;
-      offset.add(maxSSEIndex + 1, middleIdx);
-
-      // Update SSE affected by split
-      sse.set(
-          maxSSEIndex,
-          calculateSSEForBucket(
-              points, offset.get(maxSSEIndex) - 1, offset.get(maxSSEIndex + 1) 
+ 1));
-
-      double newSse =
-          calculateSSEForBucket(
-              points, offset.get(maxSSEIndex + 1) - 1, offset.get(maxSSEIndex 
+ 2) + 1);
+      // split
+      if (debug) {
+        System.out.println("+++To split bucket: " + bucket_split);
+      }
+      int startIdx = bucket_split.startIdx;
+      int endIdx = bucket_split.endIdx;
+      int middleIdx = (int) ((startIdx + endIdx) * 1.0 / 2);
+      double sse1 = calculateSSEForBucket(points, startIdx - 1, middleIdx + 1);
+      double sse2 = calculateSSEForBucket(points, middleIdx - 1, endIdx + 1);
+
+      LTDBucket newBucket =
+          new LTDBucket(middleIdx, endIdx, sse2, sse2 + 
bucket_split.getNextSSE());
+      newBucket.prev = bucket_split;
+      newBucket.next = bucket_split.next;
+      if (newBucket.next != null) {
+        newBucket.next.prev = newBucket;
+      }
+      bucket_split.next = newBucket;
+
+      bucket_split.isDeleted = true;
+      LTDBucket replaceBucket = new LTDBucket(bucket_split);
+      replaceBucket.endIdx = middleIdx;
+      replaceBucket.sse = sse1;
+      replaceBucket.sumOf2SSE = sse1 + sse2;
+      replaceBucket.next = newBucket;
+      // 更新前一个桶的sumOf2SSE 注意这意味着前一个桶也要更新heap!
+      if (replaceBucket.prev != starter) {
+        replaceBucket.prev.isDeleted = true;
+        LTDBucket preReplaceBucket = new LTDBucket(replaceBucket.prev);
+        preReplaceBucket.sumOf2SSE = preReplaceBucket.sse + sse1;
+        splitHeap.add(preReplaceBucket);
+        mergeHeap.add(preReplaceBucket);
+      }
 
-      sse.add(maxSSEIndex + 1, newSse);
+      splitHeap.add(newBucket);
+      splitHeap.add(replaceBucket);
+      mergeHeap.add(newBucket);
+      mergeHeap.add(replaceBucket);
 
-      // Merge
-      if (minSSEIndex > maxSSEIndex) {
-        minSSEIndex += 1; // Adjust index
+      if (debug) {
+        System.out.println("\tsplit into: " + replaceBucket + "," + newBucket);
       }
-      offset.remove(minSSEIndex + 1);
-
-      sse.set(
-          minSSEIndex,
-          calculateSSEForBucket(
-              points, offset.get(minSSEIndex) - 1, offset.get(minSSEIndex + 1) 
+ 1));
 
-      sse.remove(minSSEIndex + 1);
+      // merge
+      if (debug) {
+        System.out.println("---To merge bucket: " + buckets_merged + "," + 
buckets_merged.next);
+      }
+      startIdx = buckets_merged.startIdx;
+      endIdx = buckets_merged.getMergedEndIdx();
+      double sse3 = calculateSSEForBucket(points, startIdx - 1, endIdx + 1);
+      buckets_merged.isDeleted = true;
+      buckets_merged.next.isDeleted = true;
+      LTDBucket mergedBucket = new LTDBucket(buckets_merged);
+      mergedBucket.endIdx = endIdx;
+      mergedBucket.sse = sse3;
+      mergedBucket.next = buckets_merged.next.next;
+      if (mergedBucket.next != null) {
+        mergedBucket.next.prev = mergedBucket;
+      }
+      // 更新自己的sumOf2SSE
+      mergedBucket.sumOf2SSE = sse3 + mergedBucket.getNextSSE(); // 
如果next为null,这一项就是Infinity
+      // 更新前一个分桶的sumOf2SSE 注意这意味着前一个桶也要更新heap!
+      if (mergedBucket.prev != starter) {
+        mergedBucket.prev.isDeleted = true;
+        LTDBucket preReplaceBucket = new LTDBucket(mergedBucket.prev);
+        preReplaceBucket.sumOf2SSE = preReplaceBucket.sse + sse3;
+        splitHeap.add(preReplaceBucket);
+        mergeHeap.add(preReplaceBucket);
+      }
+      splitHeap.add(mergedBucket);
+      mergeHeap.add(mergedBucket);
+      if (debug) {
+        System.out.println("\tmerged bucket: " + mergedBucket);
+      }
+      //            System.out.println("----------" + splitHeap.size() + 
"-------------");
+      //            System.out.println("----------" + mergeHeap.size() + 
"-------------");
     }
 
-    // Convert ArrayList to int[] and return
-    return offset;
+    List<Integer> res = new ArrayList<>();
+    LTDBucket bucket = starter.next;
+    while (true) {
+      res.add(bucket.startIdx);
+      if (bucket.next == null) {
+        res.add(bucket.endIdx); // 收尾
+        break;
+      }
+      bucket = bucket.next;
+    }
+    if (debug) {
+      System.out.println(res);
+    }
+    return res;
   }
 
   public static List<Point> LTTB(List<Point> points, List<Integer> bins) {
@@ -185,25 +274,26 @@ public class LTD {
     return res;
   }
 
-  public static List<Point> LTD(List<Point> points, int m, int maxIter, 
boolean debug) {
+  public static List<Point> LTD(List<Point> points, int m, int maxIter, 
boolean debug)
+      throws IOException {
     List<Integer> bins = getLtdBinIdxs(points, m, maxIter, debug);
     return LTTB(points, bins);
   }
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws IOException {
     Random rand = new Random(10);
     String input = "D:\\datasets\\regular\\tmp2.csv";
     boolean hasHeader = true;
     int timeIdx = 0;
     int valueIdx = 1;
-    int N = 10000;
-    List<Point> points = Tool.readFromFile(input, hasHeader, timeIdx, 
valueIdx, N);
-    //        Polyline polyline = new Polyline();
-    //        for (int i = 0; i < N; i += 1) {
-    //            double v = rand.nextInt(1000);
-    //            polyline.addVertex(new Point(i, v));
-    //        }
-    //        List<Point> points = polyline.getVertices();
+    int N = 1000_0;
+    //        List<Point> points = Tool.readFromFile(input, hasHeader, 
timeIdx, valueIdx, N);
+    Polyline polyline = new Polyline();
+    for (int i = 0; i < N; i += 1) {
+      double v = rand.nextInt(1000);
+      polyline.addVertex(new Point(i, v));
+    }
+    List<Point> points = polyline.getVertices();
     try (FileWriter writer = new FileWriter("raw.csv")) {
       // 写入CSV头部
       writer.append("x,y,z\n");
@@ -217,17 +307,17 @@ public class LTD {
       System.out.println("Error writing to CSV file: " + e.getMessage());
     }
 
-    int m = 10;
-    int maxIter = 10;
+    int m = 1000;
+    int maxIter = -1;
     long startTime = System.currentTimeMillis();
     //        List<Integer> bins = getLtdBinIdxs(points, m, true);
-    List<Point> sampled = LTD(points, m, maxIter, true);
+    List<Point> sampled = LTD(points, m, maxIter, false);
     long endTime = System.currentTimeMillis();
     System.out.println("Time taken: " + (endTime - startTime) + "ms");
 
-    for (Point p : sampled) {
-      System.out.println(p);
-    }
+    //    for (Point p : sampled) {
+    //      System.out.println(p);
+    //    }
 
     try (PrintWriter writer = new PrintWriter(new File("output.csv"))) {
       // 写入字符串
diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTDBucket.java 
b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTDBucket.java
new file mode 100644
index 00000000000..59998b599ea
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTDBucket.java
@@ -0,0 +1,68 @@
+package org.apache.iotdb.db.query.eBUG;
+
+public class LTDBucket {
+
+  public int startIdx, endIdx;
+  public double sse, sumOf2SSE;
+
+  public LTDBucket prev;
+  public LTDBucket next;
+
+  public boolean isDeleted;
+
+  public LTDBucket(int startIdx, int endIdx, double sse, double sumOf2SSE) {
+    this.startIdx = startIdx;
+    this.endIdx = endIdx;
+    this.sse = sse;
+    this.sumOf2SSE = sumOf2SSE;
+    this.prev = null;
+    this.next = null;
+    this.isDeleted = false;
+  }
+
+  public LTDBucket(LTDBucket ltdBucket) {
+    this.startIdx = ltdBucket.startIdx;
+    this.endIdx = ltdBucket.endIdx;
+    this.sse = ltdBucket.sse;
+    this.sumOf2SSE = ltdBucket.sumOf2SSE;
+    this.prev = ltdBucket.prev;
+    this.next = ltdBucket.next;
+    this.isDeleted = false;
+
+    if (ltdBucket.prev != null) {
+      ltdBucket.prev.next = this;
+    }
+    if (ltdBucket.next != null) {
+      ltdBucket.next.prev = this;
+    }
+  }
+
+  public int getStartIdx() {
+    return startIdx;
+  }
+
+  public int getEndIdx() {
+    return endIdx;
+  }
+
+  public int getMergedEndIdx() {
+    if (this.next == null) {
+      return this.endIdx;
+    } else {
+      return this.next.endIdx;
+    }
+  }
+
+  public double getNextSSE() {
+    if (this.next == null) {
+      return Double.MAX_VALUE;
+    } else {
+      return this.next.sse;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "[" + startIdx + ", " + endIdx + "]: " + sse + "," + sumOf2SSE;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java 
b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD_slow_deprecated.java
similarity index 89%
copy from server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java
copy to 
server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD_slow_deprecated.java
index 6eb2051411c..7746a5c8f8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/eBUG/LTD_slow_deprecated.java
@@ -6,7 +6,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
-public class LTD {
+public class LTD_slow_deprecated {
   public static Point calculateAveragePoint(List<Point> points, int 
startClosed, int endClosed) {
     double sumX = 0;
     double sumY = 0;
@@ -78,6 +78,9 @@ public class LTD {
     }
 
     for (int c = 0; c < numOfIterations; c++) {
+      if (debug) {
+        System.out.println("--------------[" + c + "]----------------");
+      }
       // Find the bucket to be split
       int maxSSEIndex = -1;
       double maxSSE = Double.NEGATIVE_INFINITY;
@@ -123,6 +126,9 @@ public class LTD {
       // Split
       int startIdx = offset.get(maxSSEIndex);
       int endIdx = offset.get(maxSSEIndex + 1);
+      if (debug) {
+        System.out.println("+++To split bucket: " + startIdx + "," + endIdx + 
":" + maxSSE);
+      }
       int middleIdx = (startIdx + endIdx) / 2;
       offset.add(maxSSEIndex + 1, middleIdx);
 
@@ -144,6 +150,10 @@ public class LTD {
       }
       offset.remove(minSSEIndex + 1);
 
+      if (debug) {
+        System.out.println("---To merge bucket: " + offset.get(minSSEIndex) + 
":" + minSSE);
+      }
+
       sse.set(
           minSSEIndex,
           calculateSSEForBucket(
@@ -153,6 +163,9 @@ public class LTD {
     }
 
     // Convert ArrayList to int[] and return
+    if (debug) {
+      System.out.println(offset);
+    }
     return offset;
   }
 
@@ -196,14 +209,14 @@ public class LTD {
     boolean hasHeader = true;
     int timeIdx = 0;
     int valueIdx = 1;
-    int N = 10000;
-    List<Point> points = Tool.readFromFile(input, hasHeader, timeIdx, 
valueIdx, N);
-    //        Polyline polyline = new Polyline();
-    //        for (int i = 0; i < N; i += 1) {
-    //            double v = rand.nextInt(1000);
-    //            polyline.addVertex(new Point(i, v));
-    //        }
-    //        List<Point> points = polyline.getVertices();
+    int N = 100_0000;
+    //        List<Point> points = Tool.readFromFile(input, hasHeader, 
timeIdx, valueIdx, N);
+    Polyline polyline = new Polyline();
+    for (int i = 0; i < N; i += 1) {
+      double v = rand.nextInt(1000);
+      polyline.addVertex(new Point(i, v));
+    }
+    List<Point> points = polyline.getVertices();
     try (FileWriter writer = new FileWriter("raw.csv")) {
       // 写入CSV头部
       writer.append("x,y,z\n");
@@ -217,11 +230,11 @@ public class LTD {
       System.out.println("Error writing to CSV file: " + e.getMessage());
     }
 
-    int m = 10;
-    int maxIter = 10;
+    int m = 1000;
+    int maxIter = -1;
     long startTime = System.currentTimeMillis();
     //        List<Integer> bins = getLtdBinIdxs(points, m, true);
-    List<Point> sampled = LTD(points, m, maxIter, true);
+    List<Point> sampled = LTD(points, m, maxIter, false);
     long endTime = System.currentTimeMillis();
     System.out.println("Time taken: " + (endTime - startTime) + "ms");
 

Reply via email to