This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cd8606952391555b8eee92ce76e02c49a2986bcf Author: Lei Rui <[email protected]> AuthorDate: Sat Feb 3 04:55:36 2024 +0800 ch --- .../groupby/LocalGroupByExecutorTri_ILTS.java | 218 ++++++++++++++++++--- .../iotdb/db/integration/tri/MyTest_ILTS.java | 3 +- .../apache/iotdb/db/integration/tri/MyTest_ch.java | 24 ++- .../file/metadata/statistics/QuickHullPoint.java | 10 + .../iotdb/tsfile/read/common/IOMonitor2.java | 42 ++++ 5 files changed, 265 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java index 29ec858ff9e..f31938e8dfb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_ILTS.java @@ -36,6 +36,7 @@ import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.file.metadata.statistics.QuickHullPoint; import org.apache.iotdb.tsfile.read.common.ChunkSuit4Tri; import org.apache.iotdb.tsfile.read.common.IOMonitor2; import org.apache.iotdb.tsfile.read.filter.GroupByFilter; @@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -379,7 +381,39 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { PageReader pageReader = chunkSuit4Tri.pageReader; // TODO: 用凸包bitmap加速 // 如果块被分桶边界切开,那还是逐点遍历 - // 否则块完整落在桶内时,用凸包规则快速找到这个块中距离lr连线最远的点,最后和全局当前最远结果点比较 + // 否则块完整落在桶内时,用凸包规则快速找到这个块中沿着lr连线法向量最高和最低的点,最后和全局当前最远结果点比较 + // 也可以改成先不管是不是完整落在桶里,先找到最高低点,然后如果这两个点没有当前已知最远点远那就可以排除了, + // 否则如果最远但是不在当前桶里那还是要遍历,否则最远且在桶里就可以更新当前已知最远点。 + // 目前先这样只管完全落在桶里的 + if (CONFIG.isAcc_convex() + && chunkSuit4Tri.chunkMetadata.getStartTime() >= localCurStartTime + && chunkSuit4Tri.chunkMetadata.getEndTime() < localCurEndTime + && chunkSuit4Tri.chunkMetadata.getStatistics().getCount() >= 3 // 不考虑少于三个点 + ) { + BitSet bitSet = chunkSuit4Tri.chunkMetadata.getStatistics().getQuickHullBitSet(); + List<QuickHullPoint> foundPoints = + convexHullAcc( + lt, + lv, + rt, + rv, + pageReader, + bitSet, + chunkSuit4Tri.chunkMetadata.getStatistics().getCount()); // 有可能不止两个点,当一边是平行线两端点 + // System.out.println(foundPoints); + for (QuickHullPoint point : foundPoints) { + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; + double distance = IOMonitor2.calculateDistance(lt, lv, point.t, point.v, rt, rv); + if (distance > maxDistance) { + // 是不是因为开启了acc_rect之后,导致这里要遍历的chunk块里没有点的距离可以达到maxDistance + // 从而acc_convex不会生效?! + maxDistance = distance; + select_t = point.t; + select_v = point.v; + } + } + continue; // note this + } int count = chunkSuit4Tri.chunkMetadata.getStatistics().getCount(); int j; for (j = 0; j < count; j++) { @@ -393,39 +427,16 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { ByteBuffer valueBuffer = pageReader.valueBuffer; double v = valueBuffer.getDouble(pageReader.timeBufferLength + j * 8); double distance = IOMonitor2.calculateDistance(lt, lv, timestamp, v, rt, rv); - - // TODO 下面假装已经有凸包剪枝,先实验看看如果跳过一些点不用遍历有多少加速效果 - if (CONFIG.isAcc_convex()) { - // if (chunkSuit4Tri.chunkMetadata.getEndTime() < localCurEndTime) { - // chunkSuit4Tri.pageReader = null; - // } + if (distance > maxDistance) { + // 是不是因为开启了acc_rect之后,导致这里要遍历的chunk块里没有点的距离可以达到maxDistance + // 从而acc_convex不会生效?! + maxDistance = distance; select_t = timestamp; select_v = v; - break; - - } else { - if (distance > maxDistance) { - // TODO 是不是因为开启了acc_rect之后,导致这里要遍历的chunk块里没有点的距离可以达到maxDistance - // 从而acc_convex不会生效?! - maxDistance = distance; - select_t = timestamp; - select_v = v; - } } } } - // // clear for heap space - // if (j >= count) { - // // 代表这个chunk已经读完了,后面的bucket不会再用到,所以现在就可以清空内存的page - // // 而不是等到下一个bucket的时候再清空,因为有可能currentChunkList里chunks太多,page点同时存在太多,heap - // space不够 - // chunkSuit4Tri.pageReader = null; - // // TODO 但是这样有可能导致下一轮迭代到这个桶的时候又要读一遍这个chunk - // // 但是不这样做的话相当于一轮迭代之后几乎所有的点都加载到内存留着了 - // // 还要注意的是如果被rectangle提前剪枝掉了就不会走到这一步,也就是说那个chunk的pageReader可能还留着 - // } - } - + } // 遍历与当前桶有overlap的chunks结束 // // 记录结果 // TODO debug // series.append(select_v).append("[").append(select_t).append("]").append(","); @@ -479,6 +490,155 @@ public class LocalGroupByExecutorTri_ILTS implements GroupByExecutor { return results; } + // 在convex hull加速下找到沿着lr连线法向量方向最高和最低的pageReader的buffer中的两个位置 + // 不考虑少于三个点的情况 + public List<QuickHullPoint> convexHullAcc( + double lt, double lv, double rt, double rv, PageReader pageReader, BitSet bitSet, int count) { + // 连接左右固定点的线的法向量(A,B) + double A = lv - rv; + double B = rt - lt; + + BitSet reverseBitSet = IOMonitor2.reverse(bitSet); + + // for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1)) { + // indexes.add(i); + // } + + long fpt = pageReader.timeBuffer.getLong(0); + double fpv = pageReader.valueBuffer.getDouble(pageReader.timeBufferLength); + long lpt = pageReader.timeBuffer.getLong((count - 1) * 8); + double lpv = pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + (count - 1) * 8); + + List<QuickHullPoint> LU = new ArrayList<>(); + List<QuickHullPoint> LL = new ArrayList<>(); + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; + int bitSetIdx = bitSet.nextSetBit(1); // 0 must true + long t = pageReader.timeBuffer.getLong(bitSetIdx * 8); + double v = pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + bitSetIdx * 8); + int check = IOMonitor2.pointLocation(fpt, fpv, t, v, lpt, lpv); // note this, not l & r! + if (check > 0) { // p above the line connecting FP&LP + // init LU clockwise + LU.add(new QuickHullPoint(fpt, fpv)); + LU.add(new QuickHullPoint(t, v)); + // init LL counterclockwise + LL.add(new QuickHullPoint(t, v)); + LL.add(new QuickHullPoint(fpt, fpv)); + } else { // p below the line connecting FP&LP + // init LU clockwise + LU.add(new QuickHullPoint(t, v)); + LU.add(new QuickHullPoint(fpt, fpv)); + // init LL counterclockwise + LL.add(new QuickHullPoint(fpt, fpv)); + LL.add(new QuickHullPoint(t, v)); + } + + List<QuickHullPoint> RU = new ArrayList<>(); + List<QuickHullPoint> RL = new ArrayList<>(); + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; + int reverseBitSetIdx = reverseBitSet.nextSetBit(1); // 0 must true + t = pageReader.timeBuffer.getLong((count - reverseBitSetIdx - 1) * 8); // note this reverse + v = + pageReader.valueBuffer.getDouble( + pageReader.timeBufferLength + (count - reverseBitSetIdx - 1) * 8); + check = IOMonitor2.pointLocation(fpt, fpv, t, v, lpt, lpv); // note this, not l & r! + if (check > 0) { // p above the line connecting FP&LP + // init RU counterclockwise + RU.add(new QuickHullPoint(lpt, lpv)); + RU.add(new QuickHullPoint(t, v)); + // init RL clockwise + RL.add(new QuickHullPoint(t, v)); + RL.add(new QuickHullPoint(lpt, lpv)); + } else { // p below the line connecting FP&LP + // init RU counterclockwise + RU.add(new QuickHullPoint(t, v)); + RU.add(new QuickHullPoint(lpt, lpv)); + // init RL clockwise + RL.add(new QuickHullPoint(lpt, lpv)); + RL.add(new QuickHullPoint(t, v)); + } + + boolean findHighest = false; + boolean findLowest = false; + List<QuickHullPoint> foundPoints = new ArrayList<>(); + + while (bitSetIdx != -1 || reverseBitSetIdx != -1) { // TODO 判断如果只有两个点,会不会一直循环出不来 + // 如果两个都是-1说明两边都找完所有的点了 + // from left to right + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; + bitSetIdx = bitSet.nextSetBit(bitSetIdx + 1); + if (bitSetIdx != -1) { + t = pageReader.timeBuffer.getLong(bitSetIdx * 8); + v = pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + bitSetIdx * 8); + check = IOMonitor2.pointLocation(fpt, fpv, t, v, lpt, lpv); // note this, not l & r! + // note that below if check equals 0, that point is added to both upper and lower, which is + // necessary + if (check >= 0) { // p above or on the line connecting FP&LP + LU.add(new QuickHullPoint(t, v)); + int sign = IOMonitor2.checkSumSigns(A, B, LU); + if (sign > 0) { + findLowest = true; + foundPoints.add(LU.get(LU.size() - 2)); // 注意是倒数第二个点,不是最后一个 + } else if (sign < 0) { + findHighest = true; + foundPoints.add(LU.get(LU.size() - 2)); // 注意是倒数第二个点,不是最后一个 + } + } + if (check <= 0) { // p below or on the line connecting FP&LP + LL.add(new QuickHullPoint(t, v)); + int sign = IOMonitor2.checkSumSigns(A, B, LL); + if (sign > 0) { + findLowest = true; + foundPoints.add(LL.get(LL.size() - 2)); // 注意是倒数第二个点,不是最后一个 + } else if (sign < 0) { + findHighest = true; + foundPoints.add(LL.get(LL.size() - 2)); // 注意是倒数第二个点,不是最后一个 + } + } + if (findLowest && findHighest) { + break; + } + } + // from right to left + IOMonitor2.DCP_D_getAllSatisfiedPageData_traversedPointNum++; + reverseBitSetIdx = bitSet.nextSetBit(reverseBitSetIdx + 1); + if (reverseBitSetIdx != -1) { + t = pageReader.timeBuffer.getLong((count - reverseBitSetIdx - 1) * 8); + v = + pageReader.valueBuffer.getDouble( + pageReader.timeBufferLength + (count - reverseBitSetIdx - 1) * 8); + // note that below if check equals 0, that point is added to both upper and lower, which is + // necessary + check = IOMonitor2.pointLocation(fpt, fpv, t, v, lpt, lpv); // note this, not l & r! + if (check >= 0) { // p is above or on the line connecting FP&LP + RU.add(new QuickHullPoint(t, v)); + int sign = IOMonitor2.checkSumSigns(A, B, RU); + if (sign > 0) { + findLowest = true; + foundPoints.add(RU.get(RU.size() - 2)); // 注意是倒数第二个点,不是最后一个 + } else if (sign < 0) { + findHighest = true; + foundPoints.add(RU.get(RU.size() - 2)); // 注意是倒数第二个点,不是最后一个 + } + } + if (check <= 0) { // p below or on the line connecting FP&LP + RL.add(new QuickHullPoint(t, v)); + int sign = IOMonitor2.checkSumSigns(A, B, RL); + if (sign > 0) { + findLowest = true; + foundPoints.add(RL.get(RL.size() - 2)); // 注意是倒数第二个点,不是最后一个 + } else if (sign < 0) { + findHighest = true; + foundPoints.add(RL.get(RL.size() - 2)); // 注意是倒数第二个点,不是最后一个 + } + } + if (findLowest && findHighest) { + break; + } + } + } + return foundPoints; + } + @Override public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) throws IOException { diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java index 2e7a3331172..e7695b90e10 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ILTS.java @@ -81,7 +81,7 @@ public class MyTest_ILTS { // config.setNumIterations(4); config.setAcc_avg(true); config.setAcc_rectangle(true); - config.setAcc_convex(false); + config.setAcc_convex(true); config.setAcc_iterRepeat(true); config.setEnableCPV(false); @@ -123,6 +123,7 @@ public class MyTest_ILTS { Assert.assertEquals(res, ans); } } + System.out.println(((IoTDBStatement) statement).executeFinish()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ch.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ch.java index f2cd1dd08c0..42c6f27be0c 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ch.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_ch.java @@ -4,7 +4,9 @@ import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.apache.iotdb.tsfile.file.metadata.statistics.QuickHull; import org.apache.iotdb.tsfile.file.metadata.statistics.QuickHullPoint; +import org.apache.iotdb.tsfile.read.common.IOMonitor2; +import org.apache.commons.collections4.queue.CircularFifoQueue; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -16,6 +18,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.BitSet; +import java.util.Queue; public class MyTest_ch { @@ -42,9 +45,19 @@ public class MyTest_ch { points.add(new QuickHullPoint(5, 2, 5)); points.add(new QuickHullPoint(100, 2, 6)); + // points.add(new QuickHullPoint(0, 0, 0)); + // points.add(new QuickHullPoint(1, 0, 1)); + // points.add(new QuickHullPoint(2, 0, 2)); + // points.add(new QuickHullPoint(3, 0, 3)); + // points.add(new QuickHullPoint(4, 0, 4)); + // points.add(new QuickHullPoint(5, 0, 5)); + // points.add(new QuickHullPoint(100, 0, 6)); + BitSet bitSet1 = QuickHull.quickHull(points); System.out.println(bitSet1); - // System.out.println(bitSet1.size()); + System.out.println(bitSet1.nextSetBit(100)); + System.out.println(bitSet1.length()); + System.out.println(IOMonitor2.reverse(bitSet1)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); @@ -52,7 +65,7 @@ public class MyTest_ch { oos.flush(); byte[] bytes = baos.toByteArray(); // byte[] bytes = bitSet1.toByteArray(); - System.out.println(bytes.length); + // System.out.println(bytes.length); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bais); @@ -61,6 +74,13 @@ public class MyTest_ch { // System.out.println(bitSet.size()); Assert.assertTrue(bitSet.get(1)); Assert.assertFalse(bitSet.get(3)); + + Queue<QuickHullPoint> LU = new CircularFifoQueue<>(3); + LU.add(new QuickHullPoint(0, 0)); + LU.add(new QuickHullPoint(1, 0)); + LU.add(new QuickHullPoint(0, 0)); + LU.add(new QuickHullPoint(2, 0)); + System.out.println(LU); } // public static void main(String args[]) throws Exception { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/QuickHullPoint.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/QuickHullPoint.java index 8709d14ea01..0729410b7cb 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/QuickHullPoint.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/QuickHullPoint.java @@ -13,4 +13,14 @@ public class QuickHullPoint { this.v = v; this.idx = idx; } + + public QuickHullPoint(long t, double v) { + this.t = t; + this.v = v; + } + + @Override + public String toString() { + return "(" + t + "," + v + ")"; + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java index e6a70882edf..b39ae7b630a 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/IOMonitor2.java @@ -19,6 +19,11 @@ package org.apache.iotdb.tsfile.read.common; +import org.apache.iotdb.tsfile.file.metadata.statistics.QuickHullPoint; + +import java.util.BitSet; +import java.util.List; + public class IOMonitor2 { public enum DataSetType { // dataSet, executor, reader, file @@ -268,6 +273,43 @@ public class IOMonitor2 { return numerator / denominator; } + public static BitSet reverse(BitSet bitset) { + final BitSet reversed = new BitSet(bitset.length()); + + int reversedIndex = 0; + for (int i = bitset.length() - 1; i >= 0; i--) { + if (bitset.get(i)) { + reversed.set(reversedIndex); + } + reversedIndex++; + } + return reversed; + } + + public static int pointLocation(double tl, double vl, double t, double v, double tr, double vr) { + double cp1 = (tr - tl) * (v - vl) - (vr - vl) * (t - tl); + return Double.compare(cp1, 0); + // return 1 means P is above the line connecting l and r + // return -1 means P is below the line connecting l and r + // return 0 means P is on the line connecting l and r + } + + public static int checkSumSigns(double A, double B, List<QuickHullPoint> points) { + int size = points.size(); + QuickHullPoint p3 = points.get(size - 3); + QuickHullPoint p2 = points.get(size - 2); + QuickHullPoint p1 = points.get(size - 1); + + double x1 = (p2.t - p1.t) * A + (p2.v - p1.v) * B; + double x2 = (p2.t - p3.t) * A + (p2.v - p3.v) * B; + + return sign(x1) + sign(x2); + } + + public static int sign(double x) { + return Double.compare(x, 0); + } + public static void addMeasure(Operation operation, long elapsedTimeInNanosecond) { switch (operation) { case DCP_Server_Query_Execute:
