This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch aggregate
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/aggregate by this push:
new 2c1d124 add aggregation in qp
2c1d124 is described below
commit 2c1d124581da90b4f022024f74a845463a3be4c4
Author: qiaojialin <[email protected]>
AuthorDate: Sun Mar 10 16:46:01 2019 +0800
add aggregation in qp
---
.../apache/iotdb/db/qp/executor/OverflowQPExecutor.java | 8 +++++---
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 12 +++++++++---
.../db/query/executor/AggregateEngineExecutor.java | 1 +
.../java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java | 17 +++++++++--------
.../org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 7 ++++++-
5 files changed, 30 insertions(+), 15 deletions(-)
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 86a3cc2..5df3e95 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -51,8 +51,10 @@ import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadDataPlan;
import org.apache.iotdb.db.qp.physical.sys.MetadataPlan;
import org.apache.iotdb.db.qp.physical.sys.PropertyPlan;
+import org.apache.iotdb.db.query.executor.EngineQueryRouter;
import org.apache.iotdb.db.utils.AuthUtils;
import org.apache.iotdb.db.utils.LoadDataUtils;
+import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -176,9 +178,9 @@ public class OverflowQPExecutor extends
QueryProcessExecutor {
}
@Override
- public QueryDataSet aggregate(List<Pair<Path, String>> aggres, IExpression
expression)
- throws ProcessorException {
- throw new ProcessorException("not support");
+ public QueryDataSet aggregate(List<Path> paths, List<String> aggres,
IExpression expression)
+ throws ProcessorException, FileNodeManagerException,
QueryFilterOptimizationException, PathErrorException, IOException {
+ return new EngineQueryRouter().aggregate(paths, aggres, expression);
}
// @Override
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index b735617..86ae7f1 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -28,8 +28,10 @@ import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.executor.EngineQueryRouter;
+import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -45,12 +47,16 @@ public abstract class QueryProcessExecutor {
public QueryProcessExecutor() {
}
- public QueryDataSet processQuery(PhysicalPlan plan) throws IOException,
FileNodeManagerException {
+ public QueryDataSet processQuery(PhysicalPlan plan) throws IOException,
FileNodeManagerException, PathErrorException, QueryFilterOptimizationException,
ProcessorException {
QueryPlan queryPlan = (QueryPlan) plan;
QueryExpression queryExpression =
QueryExpression.create().setSelectSeries(queryPlan.getPaths())
.setExpression(queryPlan.getExpression());
+ if(plan instanceof AggregationPlan) {
+ return aggregate(plan.getPaths(), plan.getAggregations(),
((AggregationPlan) plan).getExpression());
+ }
+
return queryRouter.query(queryExpression);
}
@@ -73,8 +79,8 @@ public abstract class QueryProcessExecutor {
this.fetchSize.set(fetchSize);
}
- public abstract QueryDataSet aggregate(List<Pair<Path, String>> aggres,
IExpression expression)
- throws ProcessorException, IOException, PathErrorException;
+ public abstract QueryDataSet aggregate(List<Path> paths, List<String>
aggres, IExpression expression)
+ throws ProcessorException, IOException, PathErrorException,
FileNodeManagerException, QueryFilterOptimizationException;
public abstract QueryDataSet groupBy(List<Pair<Path, String>> aggres,
IExpression expression,
long unit,
diff --git
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
index 4cb269f..c9d02ae 100644
---
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
+++
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/AggregateEngineExecutor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.query.aggregation.AggreFuncFactory;
+import org.apache.iotdb.db.query.aggregation.AggregateFunction;
import org.apache.iotdb.db.query.aggregation.impl.LastAggrFunc;
import org.apache.iotdb.db.query.aggregation.impl.MaxTimeAggrFunc;
import org.apache.iotdb.db.query.context.QueryContext;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
index 83d4906..02f8a69 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/plan/QPUpdateTest.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.exception.qp.QueryProcessorException;
import org.apache.iotdb.db.qp.QueryProcessor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.utils.MemIntQpExecutor;
+import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.junit.After;
import org.junit.Test;
@@ -111,8 +112,8 @@ public class QPUpdateTest {
}
private void testUpdate()
- throws QueryProcessorException, ArgsErrorException, ProcessorException,
IOException,
- FileNodeManagerException {
+ throws QueryProcessorException, ArgsErrorException,
ProcessorException, IOException,
+ FileNodeManagerException, QueryFilterOptimizationException {
String sqlStr = "update root.qp_update_test.device_1.sensor_1 set value =
33000 where time >= 10 and time <= 10";
PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(sqlStr);
boolean upRet = processor.getExecutor().processNonQuery(plan1);
@@ -131,8 +132,8 @@ public class QPUpdateTest {
}
private void testDeletePaths()
- throws QueryProcessorException, ProcessorException, ArgsErrorException,
IOException,
- FileNodeManagerException {
+ throws QueryProcessorException, ProcessorException,
ArgsErrorException, IOException,
+ FileNodeManagerException, QueryFilterOptimizationException {
String sqlStr = "delete from root.qp_update_test.device_1 where time < 15";
PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(sqlStr);
boolean upRet = processor.getExecutor().processNonQuery(plan1);
@@ -154,8 +155,8 @@ public class QPUpdateTest {
}
private void testDelete()
- throws QueryProcessorException, ProcessorException, ArgsErrorException,
IOException,
- FileNodeManagerException {
+ throws QueryProcessorException, ProcessorException,
ArgsErrorException, IOException,
+ FileNodeManagerException, QueryFilterOptimizationException {
String sqlStr = "delete from root.qp_update_test.device_1.sensor_1 where
time < 15";
PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(sqlStr);
boolean upRet = processor.getExecutor().processNonQuery(plan1);
@@ -177,8 +178,8 @@ public class QPUpdateTest {
}
private void testInsert()
- throws QueryProcessorException, ProcessorException, ArgsErrorException,
IOException,
- FileNodeManagerException {
+ throws QueryProcessorException, ProcessorException,
ArgsErrorException, IOException,
+ FileNodeManagerException, QueryFilterOptimizationException {
String sqlStr = "insert into root.qp_update_test.device_1 (timestamp,
sensor_1, sensor_2) values (13, 50, 40)";
PhysicalPlan plan1 = processor.parseSQLToPhysicalPlan(sqlStr);
diff --git
a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
index c80bfcf..f2beb7d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/qp/utils/MemIntQpExecutor.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.utils;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -25,6 +26,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.TreeSet;
+
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
@@ -32,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -102,7 +107,7 @@ public class MemIntQpExecutor extends QueryProcessExecutor {
}
@Override
- public QueryDataSet aggregate(List<Pair<Path, String>> aggres, IExpression
expression) {
+ public QueryDataSet aggregate(List<Path> paths, List<String> aggres,
IExpression expression) throws ProcessorException, IOException,
PathErrorException, FileNodeManagerException, QueryFilterOptimizationException {
return null;
}