This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml/windowSet by this push:
     new 70f05a320c fix bugs
70f05a320c is described below

commit 70f05a320cd94d786d1de35b2609a4a82abeb2a4
Author: liuminghui233 <[email protected]>
AuthorDate: Tue Nov 1 21:19:22 2022 +0800

    fix bugs
---
 .../main/java/org/apache/iotdb/SessionExample.java | 88 ++++++++++++----------
 .../timerangeiterator/SampleWindowIterator.java    |  2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 11 +++
 .../planner/distribution/DistributionPlanner.java  |  4 +-
 .../SimpleFragmentParallelPlanner.java             |  4 +-
 .../planner/plan/node/process/WindowSplitNode.java |  6 ++
 .../apache/iotdb/session/SessionConnection.java    |  8 +-
 .../org/apache/iotdb/session/SessionDataSet.java   | 25 ++++++
 8 files changed, 101 insertions(+), 47 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 43d93cdf97..fbe0c63c8a 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -21,7 +21,6 @@ package org.apache.iotdb;
 
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.session.SessionDataSet.DataIterator;
@@ -73,45 +72,56 @@ public class SessionExample {
     // set session fetchSize
     session.setFetchSize(10000);
 
-    try {
-      session.setStorageGroup("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != 
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
-        throw e;
+    List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", 
"root.sg1.d2.s1");
+    List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
+    List<SessionDataSet> windowSet = session.fetchWindowSet(queryPaths, null, 
1, 40, 2, 2, indexes);
+    for (SessionDataSet window : windowSet) {
+      System.out.println(window.getColumnNames());
+      while (window.hasNext()) {
+        System.out.println(window.next());
       }
-    }
-
-    //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
-    insertTablet();
-    //    insertTabletWithNullValues();
-    //    insertTablets();
-    //    insertRecords();
-    //    insertText();
-    //    selectInto();
-    //    createAndDropContinuousQueries();
-    //    nonQuery();
-    //    query();
-    //    queryWithTimeout();
-    //    rawDataQuery();
-    //    lastDataQuery();
-    //    queryByIterator();
-    //    deleteData();
-    //    deleteTimeseries();
-    //    setTimeout();
-
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
+      System.out.println("*********************************");
+    }
+
+    //    try {
+    //      session.setStorageGroup("root.sg1");
+    //    } catch (StatementExecutionException e) {
+    //      if (e.getStatusCode() != 
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+    //        throw e;
+    //      }
+    //    }
+    //
+    //    //     createTemplate();
+    //    createTimeseries();
+    //    createMultiTimeseries();
+    //    insertRecord();
+    //    insertTablet();
+    //    //    insertTabletWithNullValues();
+    //    //    insertTablets();
+    //    //    insertRecords();
+    //    //    insertText();
+    //    //    selectInto();
+    //    //    createAndDropContinuousQueries();
+    //    //    nonQuery();
+    //    //    query();
+    //    //    queryWithTimeout();
+    //    //    rawDataQuery();
+    //    //    lastDataQuery();
+    //    //    queryByIterator();
+    //    //    deleteData();
+    //    //    deleteTimeseries();
+    //    //    setTimeout();
+    //
+    //    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", 
"root");
+    //    sessionEnableRedirect.setEnableQueryRedirection(true);
+    //    sessionEnableRedirect.open(false);
+    //
+    //    // set session fetchSize
+    //    sessionEnableRedirect.setFetchSize(10000);
+    //
+    //    insertRecord4Redirect();
+    //    query4Redirect();
+    //    sessionEnableRedirect.close();
     session.close();
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
index 396d53d96b..0ba0dd06cf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
@@ -59,7 +59,7 @@ public class SampleWindowIterator implements 
ITimeRangeIterator {
   public TimeRange nextTimeRange() {
     while (allTimeRangeIterator.hasNextTimeRange()) {
       TimeRange timeRange = allTimeRangeIterator.nextTimeRange();
-      if (timeRangeIndex == samplingIndexes.get(sampleIndex)) {
+      if (timeRangeIndex + 1 == samplingIndexes.get(sampleIndex)) {
         curTimeRange = timeRange;
         timeRangeIndex++;
         sampleIndex++;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 6b5fc36659..ad46dbdfb1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1231,6 +1231,9 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     List<MeasurementPath> measurementPaths = schemaTree.getAllMeasurement();
     Set<Expression> sourceExpressions =
         
measurementPaths.stream().map(TimeSeriesOperand::new).collect(Collectors.toSet());
+    for (Expression sourceExpression : sourceExpressions) {
+      analyzeExpression(analysis, sourceExpression);
+    }
     analysis.setSourceExpressions(sourceExpressions);
 
     // set transform
@@ -1245,6 +1248,9 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
                           new LinkedHashMap<>(),
                           Collections.singletonList(expression)))
               .collect(Collectors.toSet());
+      for (Expression sourceTransformExpression : sourceTransformExpressions) {
+        analyzeExpression(analysis, sourceTransformExpression);
+      }
       analysis.setSourceTransformExpressions(sourceTransformExpressions);
     }
 
@@ -1257,6 +1263,11 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
             .collect(Collectors.toList());
     analysis.setRespDatasetHeader(new DatasetHeader(columnHeaders, false));
 
+    Set<String> deviceSet =
+        
measurementPaths.stream().map(PartialPath::getDevice).collect(Collectors.toSet());
+    DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet, 
schemaTree);
+    analysis.setDataPartitionInfo(dataPartition);
+
     return analysis;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 60882f5cdf..8a6267353b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import java.util.List;
@@ -67,7 +68,8 @@ public class DistributionPlanner {
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
-    if (analysis.getStatement() instanceof QueryStatement) {
+    if (analysis.getStatement() instanceof QueryStatement
+        || analysis.getStatement() instanceof FetchWindowSetStatement) {
       analysis
           .getRespDatasetHeader()
           .setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 75087b1dbb..03341cf2be 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
@@ -114,7 +115,8 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
     fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
 
-    if (analysis.getStatement() instanceof QueryStatement) {
+    if (analysis.getStatement() instanceof QueryStatement
+        || analysis.getStatement() instanceof FetchWindowSetStatement) {
       
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
index 8ed9d404ef..83c2c6d81b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -131,4 +132,9 @@ public class WindowSplitNode extends SingleChildProcessNode 
{
   public String toString() {
     return String.format("WindowSplitNode-%s", getPlanNodeId());
   }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitWindowSplit(this, context);
+  }
 }
diff --git 
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java 
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 5f24969113..6966b89ae8 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -510,18 +510,16 @@ public class SessionConnection {
 
     List<SessionDataSet> windowSet = new ArrayList<>();
     for (List<ByteBuffer> queryResult : resp.getQueryResultList()) {
-      windowSet.add(
+      SessionDataSet sessionDataSet =
           new SessionDataSet(
-              "",
               resp.columnNameList,
               resp.columnTypeList,
               resp.columnNameIndexMap,
               resp.queryId,
               statementId,
-              client,
               sessionId,
-              queryResult,
-              false));
+              queryResult);
+      windowSet.add(sessionDataSet);
     }
     return windowSet;
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java 
b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index f7b725ab73..c04134e327 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -98,6 +98,31 @@ public class SessionDataSet implements AutoCloseable {
             timeout);
   }
 
+  public SessionDataSet(
+      List<String> columnNameList,
+      List<String> columnTypeList,
+      Map<String, Integer> columnNameIndex,
+      long queryId,
+      long statementId,
+      long sessionId,
+      List<ByteBuffer> queryResult) {
+    this.ioTDBRpcDataSet =
+        new IoTDBRpcDataSet(
+            "",
+            columnNameList,
+            columnTypeList,
+            columnNameIndex,
+            false,
+            false,
+            queryId,
+            statementId,
+            null,
+            sessionId,
+            queryResult,
+            SessionConfig.DEFAULT_FETCH_SIZE,
+            0);
+  }
+
   public int getFetchSize() {
     return ioTDBRpcDataSet.fetchSize;
   }

Reply via email to