This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml/windowSet by this push:
new 70f05a320c fix bugs
70f05a320c is described below
commit 70f05a320cd94d786d1de35b2609a4a82abeb2a4
Author: liuminghui233 <[email protected]>
AuthorDate: Tue Nov 1 21:19:22 2022 +0800
fix bugs
---
.../main/java/org/apache/iotdb/SessionExample.java | 88 ++++++++++++----------
.../timerangeiterator/SampleWindowIterator.java | 2 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 11 +++
.../planner/distribution/DistributionPlanner.java | 4 +-
.../SimpleFragmentParallelPlanner.java | 4 +-
.../planner/plan/node/process/WindowSplitNode.java | 6 ++
.../apache/iotdb/session/SessionConnection.java | 8 +-
.../org/apache/iotdb/session/SessionDataSet.java | 25 ++++++
8 files changed, 101 insertions(+), 47 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 43d93cdf97..fbe0c63c8a 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -21,7 +21,6 @@ package org.apache.iotdb;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.SessionDataSet.DataIterator;
@@ -73,45 +72,56 @@ public class SessionExample {
// set session fetchSize
session.setFetchSize(10000);
- try {
- session.setStorageGroup("root.sg1");
- } catch (StatementExecutionException e) {
- if (e.getStatusCode() !=
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
- throw e;
+ List<String> queryPaths = Arrays.asList("root.sg1.d1.s1",
"root.sg1.d2.s1");
+ List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
+ List<SessionDataSet> windowSet = session.fetchWindowSet(queryPaths, null,
1, 40, 2, 2, indexes);
+ for (SessionDataSet window : windowSet) {
+ System.out.println(window.getColumnNames());
+ while (window.hasNext()) {
+ System.out.println(window.next());
}
- }
-
- // createTemplate();
- createTimeseries();
- createMultiTimeseries();
- insertRecord();
- insertTablet();
- // insertTabletWithNullValues();
- // insertTablets();
- // insertRecords();
- // insertText();
- // selectInto();
- // createAndDropContinuousQueries();
- // nonQuery();
- // query();
- // queryWithTimeout();
- // rawDataQuery();
- // lastDataQuery();
- // queryByIterator();
- // deleteData();
- // deleteTimeseries();
- // setTimeout();
-
- sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
- sessionEnableRedirect.setEnableQueryRedirection(true);
- sessionEnableRedirect.open(false);
-
- // set session fetchSize
- sessionEnableRedirect.setFetchSize(10000);
-
- insertRecord4Redirect();
- query4Redirect();
- sessionEnableRedirect.close();
+ System.out.println("*********************************");
+ }
+
+ // try {
+ // session.setStorageGroup("root.sg1");
+ // } catch (StatementExecutionException e) {
+ // if (e.getStatusCode() !=
TSStatusCode.PATH_ALREADY_EXIST_ERROR.getStatusCode()) {
+ // throw e;
+ // }
+ // }
+ //
+ // // createTemplate();
+ // createTimeseries();
+ // createMultiTimeseries();
+ // insertRecord();
+ // insertTablet();
+ // // insertTabletWithNullValues();
+ // // insertTablets();
+ // // insertRecords();
+ // // insertText();
+ // // selectInto();
+ // // createAndDropContinuousQueries();
+ // // nonQuery();
+ // // query();
+ // // queryWithTimeout();
+ // // rawDataQuery();
+ // // lastDataQuery();
+ // // queryByIterator();
+ // // deleteData();
+ // // deleteTimeseries();
+ // // setTimeout();
+ //
+ // sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root",
"root");
+ // sessionEnableRedirect.setEnableQueryRedirection(true);
+ // sessionEnableRedirect.open(false);
+ //
+ // // set session fetchSize
+ // sessionEnableRedirect.setFetchSize(10000);
+ //
+ // insertRecord4Redirect();
+ // query4Redirect();
+ // sessionEnableRedirect.close();
session.close();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
index 396d53d96b..0ba0dd06cf 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
@@ -59,7 +59,7 @@ public class SampleWindowIterator implements
ITimeRangeIterator {
public TimeRange nextTimeRange() {
while (allTimeRangeIterator.hasNextTimeRange()) {
TimeRange timeRange = allTimeRangeIterator.nextTimeRange();
- if (timeRangeIndex == samplingIndexes.get(sampleIndex)) {
+ if (timeRangeIndex + 1 == samplingIndexes.get(sampleIndex)) {
curTimeRange = timeRange;
timeRangeIndex++;
sampleIndex++;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 6b5fc36659..ad46dbdfb1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1231,6 +1231,9 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
List<MeasurementPath> measurementPaths = schemaTree.getAllMeasurement();
Set<Expression> sourceExpressions =
measurementPaths.stream().map(TimeSeriesOperand::new).collect(Collectors.toSet());
+ for (Expression sourceExpression : sourceExpressions) {
+ analyzeExpression(analysis, sourceExpression);
+ }
analysis.setSourceExpressions(sourceExpressions);
// set transform
@@ -1245,6 +1248,9 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
new LinkedHashMap<>(),
Collections.singletonList(expression)))
.collect(Collectors.toSet());
+ for (Expression sourceTransformExpression : sourceTransformExpressions) {
+ analyzeExpression(analysis, sourceTransformExpression);
+ }
analysis.setSourceTransformExpressions(sourceTransformExpressions);
}
@@ -1257,6 +1263,11 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
.collect(Collectors.toList());
analysis.setRespDatasetHeader(new DatasetHeader(columnHeaders, false));
+ Set<String> deviceSet =
+
measurementPaths.stream().map(PartialPath::getDevice).collect(Collectors.toSet());
+ DataPartition dataPartition = fetchDataPartitionByDevices(deviceSet,
schemaTree);
+ analysis.setDataPartitionInfo(dataPartition);
+
return analysis;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 60882f5cdf..8a6267353b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import java.util.List;
@@ -67,7 +68,8 @@ public class DistributionPlanner {
public DistributedQueryPlan planFragments() {
PlanNode rootAfterRewrite = rewriteSource();
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
- if (analysis.getStatement() instanceof QueryStatement) {
+ if (analysis.getStatement() instanceof QueryStatement
+ || analysis.getStatement() instanceof FetchWindowSetStatement) {
analysis
.getRespDatasetHeader()
.setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 75087b1dbb..03341cf2be 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -114,7 +115,8 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
fragmentInstance.setDataRegionAndHost(regionReplicaSet);
fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
- if (analysis.getStatement() instanceof QueryStatement) {
+ if (analysis.getStatement() instanceof QueryStatement
+ || analysis.getStatement() instanceof FetchWindowSetStatement) {
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
index 8ed9d404ef..83c2c6d81b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -131,4 +132,9 @@ public class WindowSplitNode extends SingleChildProcessNode
{
public String toString() {
return String.format("WindowSplitNode-%s", getPlanNodeId());
}
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitWindowSplit(this, context);
+ }
}
diff --git
a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 5f24969113..6966b89ae8 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -510,18 +510,16 @@ public class SessionConnection {
List<SessionDataSet> windowSet = new ArrayList<>();
for (List<ByteBuffer> queryResult : resp.getQueryResultList()) {
- windowSet.add(
+ SessionDataSet sessionDataSet =
new SessionDataSet(
- "",
resp.columnNameList,
resp.columnTypeList,
resp.columnNameIndexMap,
resp.queryId,
statementId,
- client,
sessionId,
- queryResult,
- false));
+ queryResult);
+ windowSet.add(sessionDataSet);
}
return windowSet;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
index f7b725ab73..c04134e327 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java
@@ -98,6 +98,31 @@ public class SessionDataSet implements AutoCloseable {
timeout);
}
+ public SessionDataSet(
+ List<String> columnNameList,
+ List<String> columnTypeList,
+ Map<String, Integer> columnNameIndex,
+ long queryId,
+ long statementId,
+ long sessionId,
+ List<ByteBuffer> queryResult) {
+ this.ioTDBRpcDataSet =
+ new IoTDBRpcDataSet(
+ "",
+ columnNameList,
+ columnTypeList,
+ columnNameIndex,
+ false,
+ false,
+ queryId,
+ statementId,
+ null,
+ sessionId,
+ queryResult,
+ SessionConfig.DEFAULT_FETCH_SIZE,
+ 0);
+ }
+
public int getFetchSize() {
return ioTDBRpcDataSet.fetchSize;
}