This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4344af57769a010962c05b899dae27b5cddfcb90 Author: Lei Rui <[email protected]> AuthorDate: Thu Jan 26 15:32:12 2023 +0800 finish --- .../dataset/groupby/LocalGroupByExecutor4CPV.java | 102 +++++++++------- .../apache/iotdb/db/integration/m4/MyTest1.java | 136 ++++++++++----------- 2 files changed, 123 insertions(+), 115 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java index c98c38eb36..14cf495d8f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java @@ -187,7 +187,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } calculateFirstPoint(currentChunkList, startTime, endTime, interval, curStartTime); -// calculateLastPoint(currentChunkList, startTime, endTime, interval, curStartTime); + calculateLastPoint(currentChunkList, startTime, endTime, interval, curStartTime); calculateBottomPoint(currentChunkList, startTime, endTime, interval, curStartTime); calculateTopPoint(currentChunkList, startTime, endTime, interval, curStartTime); @@ -631,14 +631,6 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { ChunkSuit4CPV susp_candidate = currentChunkList.get(0); if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 -// currentChunkList.remove(susp_candidate); // TODO check this -// List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( -// susp_candidate.getChunkMetadata(), this.timeFilter); -// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk -// ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, -// currentChunkList, null, -// susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false -// } if (susp_candidate.getPageReader() == null) { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( susp_candidate.getChunkMetadata(), this.timeFilter); @@ -646,8 +638,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } // TODO update FP equal to or after statistics.getEndTime susp_candidate.updateFPwithTheClosetPointEqualOrAfter( - susp_candidate.getStatistics().getEndTime()); // TODO DEBUG - + susp_candidate.getStatistics().getStartTime()); // TODO DEBUG + susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!! continue; // 回到循环1 } else { // 如果不是lazy load,则该疑似candidate就是真正的candidate。 // 于是verification判断该点是否被更高优先级(更高优先级这一点在QueryUtils.modifyChunkMetaData(chunkMetadataList, @@ -667,7 +659,6 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { deleteCursor++; } else if (deleteIntervalList.get(deleteCursor).contains(candidateTimestamp)) { isDeletedItself = true; -// deleteEndTime = Math.max(deleteEndTime, deleteIntervalList.get(deleteCursor).getMax()); deleteEndTime = deleteIntervalList.get(deleteCursor).getMax(); // deleteEndTime可能会大于当前的endTime,因为delete起点可以在整个chunk startTime之后 break; // since delete intervals are already sorted and merged @@ -675,16 +666,6 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { break; // since delete intervals are already sorted and merged } } -// for (TimeRange timeRange : susp_candidate.getChunkMetadata().getDeleteIntervalList()) { -// // make sure that delete intervals are already sorted and merged -// // TODO debug -// if (timeRange.contains(candidateTimestamp)) { -// isDeletedItself = true; -// deleteEndTime = Math.max(deleteEndTime, timeRange.getMax()); -// // deleteEndTime不会超过chunkEndTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 -// // TODO check -// } -// } } // 如果被删除,标记该点所在chunk为lazy load,并且在不load数据的情况下更新chunkStartTime,然后回到循环1 if (isDeletedItself) { @@ -714,14 +695,15 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { private void calculateLastPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, long endTime, long interval, long curStartTime) throws IOException { - while (true) { // 循环1 + while (currentChunkList.size() > 0) { // 循环1 // 按照startTime和version排序,找出疑似LP candidate currentChunkList.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for different // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - int res = ((Comparable) (o2.getChunkMetadata().getEndTime())).compareTo( - o1.getChunkMetadata().getEndTime()); + int res = ((Comparable) (o2.getStatistics().getEndTime())).compareTo( + o1.getStatistics().getEndTime()); + // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata if (res != 0) { return res; } else { @@ -737,39 +719,71 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { ChunkSuit4CPV susp_candidate = currentChunkList.get(0); if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 - currentChunkList.remove(susp_candidate); // TODO check this - List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - susp_candidate.getChunkMetadata(), this.timeFilter); - for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, - currentChunkList, null, - susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false +// currentChunkList.remove(susp_candidate); // TODO check this +// List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( +// susp_candidate.getChunkMetadata(), this.timeFilter); +// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk +// ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, +// currentChunkList, null, +// susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false +// } + if (susp_candidate.getPageReader() == null) { + List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + susp_candidate.getChunkMetadata(), this.timeFilter); + susp_candidate.setPageReader((PageReader) pageReaderList.get(0)); } + // TODO update FP equal to or after statistics.getEndTime + susp_candidate.updateLPwithTheClosetPointEqualOrBefore( + susp_candidate.getStatistics().getEndTime()); // TODO DEBUG + susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!! continue; // 回到循环1 } else { // 如果不是lazy load,则该疑似candidate就是真正的candidate。 // 于是verification判断该点是否被更高优先级(更高优先级这一点在QueryUtils.modifyChunkMetaData(chunkMetadataList, // pathModifications)已做好)的deletes覆盖 - long candidateTimestamp = susp_candidate.getChunkMetadata().getEndTime(); // TODO check - Object candidateValue = susp_candidate.getChunkMetadata().getStatistics() - .getLastValue(); // TODO check + // TODO NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata + long candidateTimestamp = susp_candidate.getStatistics().getEndTime(); // TODO check + Object candidateValue = susp_candidate.getStatistics().getLastValue(); // TODO check boolean isDeletedItself = false; long deleteStartTime = Long.MAX_VALUE; // TODO check - if (susp_candidate.getChunkMetadata().getDeleteIntervalList() != null) { - for (TimeRange timeRange : susp_candidate.getChunkMetadata().getDeleteIntervalList()) { - if (timeRange.contains(candidateTimestamp)) { + List<TimeRange> deleteIntervalList = susp_candidate.getChunkMetadata() + .getDeleteIntervalList(); +// if (susp_candidate.getChunkMetadata().getDeleteIntervalList() != null) { +// for (TimeRange timeRange : susp_candidate.getChunkMetadata().getDeleteIntervalList()) { +// if (timeRange.contains(candidateTimestamp)) { +// isDeletedItself = true; +// deleteStartTime = Math.min(deleteStartTime, +// timeRange.getMin()); // deleteStartTime不会小于chunkStartTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 +// // TODO check +// } +// } +// } + if (deleteIntervalList != null) { + int deleteCursor = 0; + while (deleteCursor < deleteIntervalList.size()) { + if (deleteIntervalList.get(deleteCursor).getMax() < candidateTimestamp) { + deleteCursor++; + } else if (deleteIntervalList.get(deleteCursor).contains(candidateTimestamp)) { isDeletedItself = true; - deleteStartTime = Math.min(deleteStartTime, - timeRange.getMin()); // deleteStartTime不会小于chunkStartTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 - // TODO check + deleteStartTime = deleteIntervalList.get(deleteCursor).getMin(); + // deleteEndTime可能会大于当前的endTime,因为delete起点可以在整个chunk startTime之后 + break; // since delete intervals are already sorted and merged + } else { + break; // since delete intervals are already sorted and merged } } } // 如果被删除,标记该点所在chunk为lazy load,并且在不load数据的情况下更新chunkEndTime,然后回到循环1 if (isDeletedItself) { - susp_candidate.setLazyLoad(true); - susp_candidate.getChunkMetadata().getStatistics() - .setEndTime(deleteStartTime); // TODO check + if (deleteStartTime <= susp_candidate.getStatistics().getStartTime()) { // NOTE 这里计算的是LP + // TODO debug 整个区间内点删掉 + currentChunkList.remove(susp_candidate); + } else { + susp_candidate.setLazyLoad(true); + susp_candidate.getChunkMetadata().getStatistics().setEndTime(deleteStartTime - 1); + // -1 is because delete is closed interval + // TODO check + } continue; // 回到循环1 } else { // 否则,则就是计算结果,结束 diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java index 4ded79d4e5..ddbc69e6cc 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java @@ -19,37 +19,31 @@ package org.apache.iotdb.db.integration.m4; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Locale; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; - -import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; -import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.Locale; - -import static org.junit.Assert.fail; - public class MyTest1 { private static final String TIMESTAMP_STR = "Time"; private static String[] creationSqls = - new String[] { - "SET STORAGE GROUP TO root.vehicle.d0", - "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT64, ENCODING=PLAIN", + new String[]{ + "SET STORAGE GROUP TO root.vehicle.d0", + "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT64, ENCODING=PLAIN", // iotdb的int类型的plain编码用的是自制的不支持random access,所以值类型用long }; @@ -88,14 +82,14 @@ public class MyTest1 { prepareData1(); String[] res = - new String[] { - "0,1,20,5,20,5[1],30[10]", - "25,25,45,8,30,8[25],40[30]", - "50,52,54,8,18,8[52],18[54]", - "75,null,null,null,null,null,null" + new String[]{ + "0,1,20,5,20,5[1],30[10]", + "25,25,45,8,30,8[25],40[30]", + "50,52,54,8,18,8[52],18[54]", + "75,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -121,7 +115,7 @@ public class MyTest1 { + "," + resultSet.getString(String.format("max_value(%s)", d0s0)); System.out.println(ans); -// Assert.assertEquals(res[i++], ans); + Assert.assertEquals(res[i++], ans); } } } catch (Exception e) { @@ -135,10 +129,10 @@ public class MyTest1 { prepareData5(); String[] res = - new String[] { + new String[]{ "0,1,20,5,20,5[1],30[10]", "25,25,45,8,30,8[25],40[30]", - "50,52,54,8,18,8[52],18[54]", + "50,null,null,null,null,null,null", "75,null,null,null,null,null,null" }; try (Connection connection = @@ -168,7 +162,7 @@ public class MyTest1 { + "," + resultSet.getString(String.format("max_value(%s)", d0s0)); System.out.println(ans); -// Assert.assertEquals(res[i++], ans); + Assert.assertEquals(res[i++], ans); } } } catch (Exception e) { @@ -181,8 +175,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -270,14 +264,14 @@ public class MyTest1 { prepareData2(); String[] res = - new String[] { - "0,1,20,5,20,5[1],30[10]", - "25,25,27,8,20,8[25],20[27]", - "50,null,null,null,null,null,null", - "75,null,null,null,null,null,null" + new String[]{ + "0,1,20,5,20,5[1],30[10]", + "25,25,27,8,20,8[25],20[27]", + "50,null,null,null,null,null,null", + "75,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -304,7 +298,7 @@ public class MyTest1 { + "," + resultSet.getString(String.format("max_value(%s)", d0s0)); System.out.println(ans); -// Assert.assertEquals(res[i++], ans); + Assert.assertEquals(res[i++], ans); } } } catch (Exception e) { @@ -317,8 +311,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/151995378-07a2f8df-5cac-499a-ae88-e3b017eee07a.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -364,16 +358,16 @@ public class MyTest1 { prepareData2(); String[] res = - new String[] { - "0,1,20,5,20,5[1],30[10]", - "25,25,27,8,20,8[25],20[27]", - "50,null,null,null,null,null,null", - "75,null,null,null,null,null,null", - "100,120,120,8,8,8[120],8[120]", - "125,null,null,null,null,null,null" + new String[]{ + "0,1,20,5,20,5[1],30[10]", + "25,25,27,8,20,8[25],20[27]", + "50,null,null,null,null,null,null", + "75,null,null,null,null,null,null", + "100,120,120,8,8,8[120],8[120]", + "125,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -414,14 +408,14 @@ public class MyTest1 { prepareData3(); String[] res = - new String[] { - "0,1,22,5,4,1[10],10[2]", - "25,30,40,8,2,2[40],8[30]", - "50,55,72,5,4,4[72],20[62]", - "75,80,90,11,1,1[90],11[80]" + new String[]{ + "0,1,22,5,4,1[10],10[2]", + "25,30,40,8,2,2[40],8[30]", + "50,55,72,5,4,4[72],20[62]", + "75,80,90,11,1,1[90],11[80]" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -448,7 +442,7 @@ public class MyTest1 { + "," + resultSet.getString(String.format("max_value(%s)", d0s0)); System.out.println(ans); -// Assert.assertEquals(res[i++], ans); + Assert.assertEquals(res[i++], ans); } } } catch (Exception e) { @@ -461,8 +455,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -502,14 +496,14 @@ public class MyTest1 { prepareData3_2(); String[] res = - new String[] { - "0,1,22,5,4,1[10],10[2]", - "25,30,40,8,2,2[40],8[30]", - "50,55,72,5,4,4[72],20[62]", - "75,80,90,11,1,1[90],11[80]" + new String[]{ + "0,1,22,5,4,1[10],10[2]", + "25,30,40,8,2,2[40],8[30]", + "50,55,72,5,4,4[72],20[62]", + "75,80,90,11,1,1[90],11[80]" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -549,8 +543,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -590,14 +584,14 @@ public class MyTest1 { prepareData4(); String[] res = - new String[] { - "0,1,20,5,20,5[1],30[10]", - "25,25,45,8,30,8[25],30[45]", - "50,52,54,8,18,8[52],18[54]", - "75,null,null,null,null,null,null" + new String[]{ + "0,1,20,5,20,5[1],30[10]", + "25,25,45,8,30,8[25],30[45]", + "50,52,54,8,18,8[52],18[54]", + "75,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -624,7 +618,7 @@ public class MyTest1 { + "," + resultSet.getString(String.format("max_value(%s)", d0s0)); System.out.println(ans); -// Assert.assertEquals(res[i++], ans); + Assert.assertEquals(res[i++], ans); } } } catch (Exception e) { @@ -637,8 +631,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/152006061-f1d95952-3f5c-4d88-b34e-45d3bb61b600.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) {
