This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/LTS-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64ca48054c6a71a2d4889249d357fc9eae30a9d0 Author: Lei Rui <[email protected]> AuthorDate: Sat Jan 27 20:39:37 2024 +0800 postprocess MinMax --- .../groupby/GroupByWithoutValueFilterDataSet.java | 88 ++++++++++++++++++++++ .../groupby/LocalGroupByExecutorTri_MinMax.java | 6 -- .../iotdb/db/integration/tri/MyTest_MinMax.java | 47 +++++------- 3 files changed, 107 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java index 07dbfecc3e3..9472f1f62fa 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java @@ -132,8 +132,96 @@ public class GroupByWithoutValueFilterDataSet extends GroupByEngineDataSet { } } + /** Each row correspond to result of a bucket */ + public List<List<AggregateResult>> getAll() throws IOException, QueryProcessException { + List<List<AggregateResult>> resultsAllBuckets = new ArrayList<>(); + GroupByExecutor executor = null; + for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { + executor = pathToExecutorEntry.getValue(); // assume only one series here + break; + } + for (long localCurStartTime = startTime; + localCurStartTime < endTime; + localCurStartTime += interval) { // not change real curStartTime&curEndTime + List<AggregateResult> aggregations = + executor.calcResult( + localCurStartTime, localCurStartTime + interval, startTime, endTime, interval); + resultsAllBuckets.add(aggregations); // needs deep copy!! + } + + // make the next hasNextWithoutConstraint() false + curStartTime = endTime; + hasCachedTimeInterval = false; + + return resultsAllBuckets; + } + @Override public RowRecord nextWithoutConstraint() throws IOException { + if (CONFIG.getEnableTri().equals("MinMax")) { + return nextWithoutConstraintTri_MinMax(); + } + // } else if (CONFIG.getEnableTri().equals("MinMaxLTTB")) { + // // TODO + // } else if (CONFIG.getEnableTri().equals("M4LTTB")) { + // // TODO + // } else if (CONFIG.getEnableTri().equals("LTTB")) { + // // TODO + // } else if (CONFIG.getEnableTri().equals("ILTS")) { + // // TODO + // } + else { + return nextWithoutConstraint_raw(); + } + } + + public RowRecord nextWithoutConstraintTri_MinMax() throws IOException { + RowRecord record; + try { + GroupByExecutor executor = null; + for (Entry<PartialPath, GroupByExecutor> pathToExecutorEntry : pathExecutors.entrySet()) { + executor = pathToExecutorEntry.getValue(); // assume only one series here + break; + } + + // concat results into a string + record = new RowRecord(0); + StringBuilder series = new StringBuilder(); + + for (long localCurStartTime = startTime; + localCurStartTime < endTime; + localCurStartTime += interval) { // not change real curStartTime&curEndTime + // attention the returned aggregations need deep copy if using directly + List<AggregateResult> aggregations = + executor.calcResult( + localCurStartTime, + localCurStartTime + interval, + startTime, + endTime, + interval); // attention + for (AggregateResult aggregation : aggregations) { + // Each row correspond to (bucketLeftBound, minV[bottomT], maxV[topT]) of a MinMax bucket + series.append(aggregation.getResult()).append(","); + } + } + + // MIN_MAX_INT64 this type for field.setBinaryV(new Binary(value.toString())) + record.addField(series, TSDataType.MIN_MAX_INT64); + + } catch (QueryProcessException e) { + logger.error("GroupByWithoutValueFilterDataSet execute has error", e); + throw new IOException(e.getMessage(), e); + } + + // in the end, make the next hasNextWithoutConstraint() false + // as we already fetch all here + curStartTime = endTime; + hasCachedTimeInterval = false; + + return record; + } + + public RowRecord nextWithoutConstraint_raw() throws IOException { if (!hasCachedTimeInterval) { throw new IOException( "need to call hasNext() before calling next() " + "in GroupByWithoutValueFilterDataSet."); diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java index acf78c63ea4..ed4ced412ef 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutorTri_MinMax.java @@ -257,12 +257,6 @@ public class LocalGroupByExecutorTri_MinMax implements GroupByExecutor { } } - /** - * @param curStartTime closed - * @param curEndTime open - * @param startTime closed - * @param endTime open - */ @Override public List<AggregateResult> calcResult( long curStartTime, long curEndTime, long startTime, long endTime, long interval) diff --git a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java index 6f110759ca6..8364c497ada 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/tri/MyTest_MinMax.java @@ -98,9 +98,10 @@ public class MyTest_MinMax { @Test public void test1() throws Exception { prepareData1(); - - String[] res = - new String[] {"0,1[20],15[2]", "25,8[25],8[25]", "50,3[54],3[54]", "75,null,null"}; + // String[] res = new String[]{"0,1[20],15[2]", "25,8[25],8[25]", "50,3[54],3[54]", + // "75,null,null"}; + String res = "0,1[20],15[2],8[25],8[25],3[54],3[54],null,null,"; + // 0,BPv[t]ofBucket1,TPv[t]ofBucket1,BPv[t]ofBucket2,TPv[t]ofBucket2,... try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { @@ -112,14 +113,10 @@ public class MyTest_MinMax { try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(String.format("min_value(%s)", d0s0)) - + "," - + resultSet.getString(String.format("max_value(%s)", d0s0)); + // 注意从1开始编号,所以第一列是无意义时间戳 + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2); System.out.println(ans); - Assert.assertEquals(res[i++], ans); + Assert.assertEquals(res, ans); } } System.out.println(((IoTDBStatement) statement).executeFinish()); @@ -160,8 +157,10 @@ public class MyTest_MinMax { public void test3() { prepareData3(); - String[] res = - new String[] {"0,1[10],10[2]", "25,2[40],8[30]", "50,4[72],20[62]", "75,1[90],11[80]"}; + // String[] res = new String[]{"0,1[10],10[2]", "25,2[40],8[30]", "50,4[72],20[62]", + // "75,1[90],11[80]"}; + String res = "0,1[10],10[2],2[40],8[30],4[72],20[62],1[90],11[80],"; + // 0,BPv[t]ofBucket1,TPv[t]ofBucket1,BPv[t]ofBucket2,TPv[t]ofBucket2,... try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { @@ -175,14 +174,9 @@ public class MyTest_MinMax { try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(String.format("min_value(%s)", d0s0)) - + "," - + resultSet.getString(String.format("max_value(%s)", d0s0)); + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2); System.out.println(ans); - Assert.assertEquals(res[i++], ans); + Assert.assertEquals(res, ans); } } } catch (Exception e) { @@ -235,8 +229,10 @@ public class MyTest_MinMax { public void test3_2() { prepareData3_2(); - String[] res = - new String[] {"0,1[10],10[2]", "25,null,null", "50,4[72],20[62]", "75,1[90],11[80]"}; + // String[] res = new String[]{"0,1[10],10[2]", "25,null,null", "50,4[72],20[62]", + // "75,1[90],11[80]"}; + String res = "0,1[10],10[2],null,null,4[72],20[62],1[90],11[80],"; + // 0,BPv[t]ofBucket1,TPv[t]ofBucket1,BPv[t]ofBucket2,TPv[t]ofBucket2,... try (Connection connection = DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { @@ -250,14 +246,9 @@ public class MyTest_MinMax { try (ResultSet resultSet = statement.getResultSet()) { int i = 0; while (resultSet.next()) { - String ans = - resultSet.getString(TIMESTAMP_STR) - + "," - + resultSet.getString(String.format("min_value(%s)", d0s0)) - + "," - + resultSet.getString(String.format("max_value(%s)", d0s0)); + String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString(2); System.out.println(ans); - Assert.assertEquals(res[i++], ans); + Assert.assertEquals(res, ans); } } } catch (Exception e) {
