This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cherry_pick_cluster_2
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0a45c19c8f07ca8014ef7d7c4a37fd9956b1de32
Author: jt2594838 <[email protected]>
AuthorDate: Wed Mar 11 17:40:39 2020 +0800

    cheery pick changes from cluster_new:
    1. getAllClosedStorageGroupTsFile is now grouped by partition
    2. fix empty AggregationResult is not correctly serialized
    3. fix two empty AvgAggrResult merge to a wrong result
    4. fix reset in First/LastValue
    5. change member protection levels
    6. extract GroupByExecutor and LocalGroupByExecutor
    7. extract getters of readers and datasets
    8. extract fill initialization
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  20 ++-
 .../org/apache/iotdb/db/metadata/MManager.java     |   2 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   1 -
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |   2 +-
 .../db/query/aggregation/AggregateResult.java      | 101 ++++++-----
 .../db/query/aggregation/impl/AvgAggrResult.java   |   9 +
 .../aggregation/impl/FirstValueAggrResult.java     |   6 +
 .../aggregation/impl/LastValueAggrResult.java      |   6 +
 .../dataset/groupby/GroupByEngineDataSet.java      |  13 +-
 .../db/query/dataset/groupby/GroupByExecutor.java  |  15 ++
 .../groupby/GroupByWithValueFilterDataSet.java     |  26 ++-
 .../groupby/GroupByWithoutValueFilterDataSet.java  | 185 ++------------------
 .../dataset/groupby/LocalGroupByExecutor.java      | 187 +++++++++++++++++++++
 .../db/query/executor/AggregationExecutor.java     |  16 +-
 .../iotdb/db/query/executor/FillQueryExecutor.java |  11 +-
 .../iotdb/db/query/executor/QueryRouter.java       |  23 ++-
 .../java/org/apache/iotdb/db/query/fill/IFill.java |   8 +
 .../iotdb/db/query/reader/series/SeriesReader.java |   4 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  17 +-
 .../org/apache/iotdb/db/utils/FilePathUtils.java   |   6 +
 .../iotdb/db/qp/plan/ConcatOptimizerTest.java      |  22 +++
 .../tsfile/read/query/dataset/QueryDataSet.java    |   3 +
 22 files changed, 420 insertions(+), 263 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 9680f39..a4923ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -529,15 +529,23 @@ public class StorageEngine implements IService {
 
   /**
    *
-   * @return TsFiles (seq or unseq) grouped by their storage group.
+   * @return TsFiles (seq or unseq) grouped by their storage group and 
partition number.
    */
-  public Map<String, List<TsFileResource>> getAllClosedStorageGroupTsFile() {
-    Map<String, List<TsFileResource>> ret = new HashMap<>();
+  public Map<String, Map<Integer, List<TsFileResource>>> 
getAllClosedStorageGroupTsFile() {
+    Map<String, Map<Integer, List<TsFileResource>>> ret = new HashMap<>();
     for (Entry<String, StorageGroupProcessor> entry : processorMap
         .entrySet()) {
-      ret.computeIfAbsent(entry.getKey(), sg -> new 
ArrayList<>()).addAll(entry.getValue().getSequenceFileTreeSet());
-      ret.get(entry.getKey()).addAll(entry.getValue().getUnSequenceFileList());
-      ret.get(entry.getKey()).removeIf(file -> !file.isClosed());
+      List<TsFileResource> sequenceFiles = 
entry.getValue().getSequenceFileTreeSet();
+      for (TsFileResource sequenceFile : sequenceFiles) {
+        if (!sequenceFile.isClosed()) {
+          continue;
+        }
+        String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
+        int partitionNum = Integer.parseInt(fileSplits[fileSplits.length - 2]);
+        Map<Integer, List<TsFileResource>> storageGroupFiles = 
ret.computeIfAbsent(entry.getKey()
+            ,n -> new HashMap<>());
+        storageGroupFiles.computeIfAbsent(partitionNum, n -> new 
ArrayList<>()).add(sequenceFile);
+      }
     }
     return ret;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java 
b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 00b21cc..f5a20ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -885,7 +885,7 @@ public class MManager {
    * and the wildcard will be removed.
    * If the wildcard is at the tail, then the inference will go on until the 
storage groups are found
    * and the wildcard will be kept.
-   * (2) Suppose the part of the path is a substring that begin after the 
storage group name. (e.g.,
+   * (2) Suppose the path of the path is a substring that begin after the 
storage group name. (e.g.,
    *  For "root.*.sg1.a.*.b.*" and "root.x.sg1" is a storage group, then this 
part is "a.*.b.*").
    *  For this part, keep what it is.
    *
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index 45c53a7..4e06220 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -299,7 +299,6 @@ public class PhysicalGenerator {
           try {
             // remove stars in SELECT to get actual paths
             List<String> actualPaths = 
getMatchedTimeseries(fullPath.getFullPath());
-
             // for actual non exist path
             if (actualPaths.isEmpty() && originAggregations.isEmpty()) {
               String nonExistMeasurement = fullPath.getMeasurement();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 32a9a50..0cf280f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -119,7 +119,7 @@ public class ConcatPathOptimizer implements 
ILogicalOptimizer {
     if(!isAlignByDevice){
       sfwOperator.setFilterOperator(concatFilter(prefixPaths, filter, 
filterPaths));
     }
-    filter.setPathSet(filterPaths);
+    sfwOperator.getFilterOperator().setPathSet(filterPaths);
     // GROUP_BY_DEVICE leaves the concatFilter to PhysicalGenerator to 
optimize filter without prefix first
 
     return sfwOperator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
index ee083ee..ad95518 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
@@ -44,7 +45,7 @@ public abstract class AggregateResult {
   private double doubleValue;
   private Binary binaryValue;
 
-  private boolean hasResult;
+  protected boolean hasResult;
 
   /**
    * construct.
@@ -110,29 +111,32 @@ public abstract class AggregateResult {
     TSDataType dataType = TSDataType.deserialize(buffer.getShort());
     AggregateResult aggregateResult = AggregateResultFactory
         .getAggrResultByType(aggregationType, dataType);
-    switch (dataType) {
-      case BOOLEAN:
-        aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer));
-        break;
-      case INT32:
-        aggregateResult.setIntValue(buffer.getInt());
-        break;
-      case INT64:
-        aggregateResult.setLongValue(buffer.getLong());
-        break;
-      case FLOAT:
-        aggregateResult.setFloatValue(buffer.getFloat());
-        break;
-      case DOUBLE:
-        aggregateResult.setDoubleValue(buffer.getDouble());
-        break;
-      case TEXT:
-        aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer));
-        break;
-      default:
-        throw new IllegalArgumentException("Invalid Aggregation Type: " + 
dataType.name());
+    boolean hasResult = ReadWriteIOUtils.readBool(buffer);
+    if (hasResult) {
+      switch (dataType) {
+        case BOOLEAN:
+          aggregateResult.setBooleanValue(ReadWriteIOUtils.readBool(buffer));
+          break;
+        case INT32:
+          aggregateResult.setIntValue(buffer.getInt());
+          break;
+        case INT64:
+          aggregateResult.setLongValue(buffer.getLong());
+          break;
+        case FLOAT:
+          aggregateResult.setFloatValue(buffer.getFloat());
+          break;
+        case DOUBLE:
+          aggregateResult.setDoubleValue(buffer.getDouble());
+          break;
+        case TEXT:
+          aggregateResult.setBinaryValue(ReadWriteIOUtils.readBinary(buffer));
+          break;
+        default:
+          throw new IllegalArgumentException("Invalid Aggregation Type: " + 
dataType.name());
+      }
+      aggregateResult.deserializeSpecificFields(buffer);
     }
-    aggregateResult.deserializeSpecificFields(buffer);
     return aggregateResult;
   }
 
@@ -141,29 +145,32 @@ public abstract class AggregateResult {
   public void serializeTo(OutputStream outputStream) throws IOException {
     aggregationType.serializeTo(outputStream);
     ReadWriteIOUtils.write(resultDataType, outputStream);
-    switch (resultDataType) {
-      case BOOLEAN:
-        ReadWriteIOUtils.write(booleanValue, outputStream);
-        break;
-      case INT32:
-        ReadWriteIOUtils.write(intValue, outputStream);
-        break;
-      case INT64:
-        ReadWriteIOUtils.write(longValue, outputStream);
-        break;
-      case FLOAT:
-        ReadWriteIOUtils.write(floatValue, outputStream);
-        break;
-      case DOUBLE:
-        ReadWriteIOUtils.write(doubleValue, outputStream);
-        break;
-      case TEXT:
-        ReadWriteIOUtils.write(binaryValue, outputStream);
-        break;
-      default:
-        throw new IllegalArgumentException("Invalid Aggregation Type: " + 
resultDataType.name());
+    ReadWriteIOUtils.write(hasResult(), outputStream);
+    if (hasResult()) {
+      switch (resultDataType) {
+        case BOOLEAN:
+          ReadWriteIOUtils.write(booleanValue, outputStream);
+          break;
+        case INT32:
+          ReadWriteIOUtils.write(intValue, outputStream);
+          break;
+        case INT64:
+          ReadWriteIOUtils.write(longValue, outputStream);
+          break;
+        case FLOAT:
+          ReadWriteIOUtils.write(floatValue, outputStream);
+          break;
+        case DOUBLE:
+          ReadWriteIOUtils.write(doubleValue, outputStream);
+          break;
+        case TEXT:
+          ReadWriteIOUtils.write(binaryValue, outputStream);
+          break;
+        default:
+          throw new IllegalArgumentException("Invalid Aggregation Type: " + 
resultDataType.name());
+      }
+      serializeSpecificFields(outputStream);
     }
-    serializeSpecificFields(outputStream);
   }
 
   protected abstract void serializeSpecificFields(OutputStream outputStream) 
throws IOException;
@@ -294,4 +301,8 @@ public abstract class AggregateResult {
   public String toString() {
     return String.valueOf(getResult());
   }
+
+  public AggregationType getAggregationType() {
+    return aggregationType;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
index 1e44444..e93d0e6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java
@@ -45,6 +45,11 @@ public class AvgAggrResult extends AggregateResult {
   }
 
   @Override
+  protected boolean hasResult() {
+    return cnt > 0;
+  }
+
+  @Override
   public Double getResult() {
     if (cnt > 0) {
       setDoubleValue(avg);
@@ -120,6 +125,10 @@ public class AvgAggrResult extends AggregateResult {
   @Override
   public void merge(AggregateResult another) {
     AvgAggrResult anotherAvg = (AvgAggrResult) another;
+    if (anotherAvg.cnt == 0) {
+      // avoid two empty results producing an NaN
+      return;
+    }
     avg = avg * ((double) cnt / (cnt + anotherAvg.cnt)) +
         anotherAvg.avg * ((double) anotherAvg.cnt / (cnt + anotherAvg.cnt));
     cnt += anotherAvg.cnt;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
index 52da9c0..2dc3e2b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java
@@ -42,6 +42,12 @@ public class FirstValueAggrResult extends AggregateResult {
   }
 
   @Override
+  public void reset() {
+    super.reset();
+    timestamp = Long.MAX_VALUE;
+  }
+
+  @Override
   public Object getResult() {
     return hasResult() ? getValue() : null;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
index 2077af4..13a6a67 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java
@@ -41,6 +41,12 @@ public class LastValueAggrResult extends AggregateResult {
   }
 
   @Override
+  public void reset() {
+    super.reset();
+    timestamp = Long.MIN_VALUE;
+  }
+
+  @Override
   public Object getResult() {
     return hasResult() ? getValue() : null;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 94d290e..4ca7ceb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -27,18 +27,21 @@ import org.apache.iotdb.tsfile.utils.Pair;
 public abstract class GroupByEngineDataSet extends QueryDataSet {
 
   protected long queryId;
-  private long interval;
-  private long slidingStep;
+  protected long interval;
+  protected long slidingStep;
   // total query [startTime, endTime)
-  private long startTime;
-  private long endTime;
+  protected long startTime;
+  protected long endTime;
 
   // current interval [curStartTime, curEndTime)
   protected long curStartTime;
   protected long curEndTime;
-  private int usedIndex;
+  protected int usedIndex;
   protected boolean hasCachedTimeInterval;
 
+  public GroupByEngineDataSet() {
+  }
+
   /**
    * groupBy query.
    */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
new file mode 100644
index 0000000..ced8008
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByExecutor.java
@@ -0,0 +1,15 @@
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public interface GroupByExecutor {
+  void addAggregateResult(AggregateResult aggrResult, int index);
+
+  void resetAggregateResults();
+
+  List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, long 
curEndTime) throws IOException, QueryProcessException;
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 44402cc..a951001 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -29,11 +29,14 @@ import 
org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
 
 public class GroupByWithValueFilterDataSet extends GroupByEngineDataSet {
@@ -53,7 +56,10 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
   /**
    * group by batch calculation size.
    */
-  private int timeStampFetchSize;
+  protected int timeStampFetchSize;
+
+  public GroupByWithValueFilterDataSet() {
+  }
 
   /**
    * constructor.
@@ -74,18 +80,28 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
   /**
    * init reader and aggregate function.
    */
-  private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
+  protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
       throws StorageEngineException {
-    this.timestampGenerator = new 
ServerTimeGenerator(groupByPlan.getExpression(), context);
+    this.timestampGenerator = getTimeGenerator(groupByPlan.getExpression(), 
context);
     this.allDataReaderList = new ArrayList<>();
     this.groupByPlan = groupByPlan;
     for (int i = 0; i < paths.size(); i++) {
       Path path = paths.get(i);
-      allDataReaderList.add(new SeriesReaderByTimestamp(path, 
dataTypes.get(i), context,
-          QueryResourceManager.getInstance().getQueryDataSource(path, context, 
null), null));
+      allDataReaderList.add(getReaderByTime(path, dataTypes.get(i), context, 
null));
     }
   }
 
+  protected TimeGenerator getTimeGenerator(IExpression expression, 
QueryContext context)
+      throws StorageEngineException {
+    return new ServerTimeGenerator(expression, context);
+  }
+
+  protected IReaderByTimestamp getReaderByTime(Path path,
+      TSDataType dataType, QueryContext context, TsFileFilter fileFilter) 
throws StorageEngineException {
+    return new SeriesReaderByTimestamp(path, dataType, context,
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, 
null), fileFilter);
+  }
+
   @Override
   protected RowRecord nextWithoutConstraint() throws IOException {
     if (!hasCachedTimeInterval) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 023cbf3..82c478a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -20,27 +20,20 @@
 package org.apache.iotdb.db.query.dataset.groupby;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.query.reader.series.IAggregateReader;
-import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
@@ -54,7 +47,9 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
       .getLogger(GroupByWithoutValueFilterDataSet.class);
 
   private Map<Path, GroupByExecutor> pathExecutors = new HashMap<>();
-  private TimeRange timeRange;
+
+  public GroupByWithoutValueFilterDataSet() {
+  }
 
   /**
    * constructor.
@@ -66,7 +61,7 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
     initGroupBy(context, groupByPlan);
   }
 
-  private void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
+  protected void initGroupBy(QueryContext context, GroupByPlan groupByPlan)
       throws StorageEngineException {
     IExpression expression = groupByPlan.getExpression();
 
@@ -80,7 +75,7 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
       if (!pathExecutors.containsKey(path)) {
         //init GroupByExecutor
         pathExecutors.put(path,
-            new GroupByExecutor(path, dataTypes.get(i), context, timeFilter));
+           getGroupByExecutor(path, dataTypes.get(i), context, timeFilter, 
null));
       }
       AggregateResult aggrResult = AggregateResultFactory
           
.getAggrResultByName(groupByPlan.getDeduplicatedAggregations().get(i),
@@ -97,7 +92,6 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
     }
     hasCachedTimeInterval = false;
     RowRecord record = new RowRecord(curStartTime);
-    timeRange = new TimeRange(curStartTime, curEndTime - 1);
 
     AggregateResult[] fields = new AggregateResult[paths.size()];
 
@@ -105,7 +99,7 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
       for (Entry<Path, GroupByExecutor> pathGroupByExecutorEntry : 
pathExecutors.entrySet()) {
         GroupByExecutor executor = pathGroupByExecutorEntry.getValue();
         executor.resetAggregateResults();
-        List<Pair<AggregateResult, Integer>> aggregations = 
executor.calcResult();
+        List<Pair<AggregateResult, Integer>> aggregations = 
executor.calcResult(curStartTime, curEndTime);
         for (Pair<AggregateResult, Integer> aggregation : aggregations) {
           fields[aggregation.right] = aggregation.left;
         }
@@ -125,165 +119,10 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
     return record;
   }
 
-  private class GroupByExecutor {
-
-    private IAggregateReader reader;
-    private BatchData preCachedData;
-    //<aggFunction - indexForRecord> of path
-    private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();
-
-    GroupByExecutor(Path path, TSDataType dataType, QueryContext context, 
Filter timeFilter)
-        throws StorageEngineException {
-      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
-          .getQueryDataSource(path, context, timeFilter);
-      // update filter by TTL
-      timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-      this.reader = new SeriesAggregateReader(path, dataType, context, 
queryDataSource, timeFilter,
-          null, null);
-      this.preCachedData = null;
-    }
-
-    private List<Pair<AggregateResult, Integer>> calcResult()
-        throws IOException, QueryProcessException {
-      if (calcFromCacheData()) {
-        return results;
-      }
-
-      //read page data firstly
-      if (readAndCalcFromPage()) {
-        return results;
-      }
-
-      //read chunk finally
-      while (reader.hasNextChunk()) {
-        Statistics chunkStatistics = reader.currentChunkStatistics();
-        if (chunkStatistics.getStartTime() >= curEndTime) {
-          return results;
-        }
-        //calc from chunkMetaData
-        if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
-            new TimeRange(chunkStatistics.getStartTime(), 
chunkStatistics.getEndTime()))) {
-          calcFromStatistics(chunkStatistics);
-          reader.skipCurrentChunk();
-          if(isEndCalc()){
-            return results;
-          }
-          continue;
-        }
-        if (readAndCalcFromPage()) {
-          return results;
-        }
-      }
-      return results;
-    }
-
-    private void addAggregateResult(AggregateResult aggrResult, int index) {
-      results.add(new Pair<>(aggrResult, index));
-    }
-
-    private boolean isEndCalc() {
-      for (Pair<AggregateResult, Integer> result : results) {
-        if (!result.left.isCalculatedAggregationResult()) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    private boolean calcFromCacheData() throws IOException {
-      calcFromBatch(preCachedData);
-      // The result is calculated from the cache
-      return (preCachedData != null && preCachedData.getMaxTimestamp() >= 
curEndTime)
-          || isEndCalc();
-    }
-
-    private void calcFromBatch(BatchData batchData) throws IOException {
-      // is error data
-      if (batchData == null
-          || !batchData.hasCurrent()
-          || batchData.getMaxTimestamp() < curStartTime
-          || batchData.currentTime() >= curEndTime) {
-        return;
-      }
-
-      for (Pair<AggregateResult, Integer> result : results) {
-        //current agg method has been calculated
-        if (result.left.isCalculatedAggregationResult()) {
-          continue;
-        }
-        //lazy reset batch data for calculation
-        batchData.resetBatchData();
-        //skip points that cannot be calculated
-        while (batchData.currentTime() < curStartTime && 
batchData.hasCurrent()) {
-          batchData.next();
-        }
-        if (batchData.hasCurrent()) {
-          result.left.updateResultFromPageData(batchData, curEndTime);
-        }
-      }
-      //can calc for next interval
-      if (batchData.getMaxTimestamp() >= curEndTime) {
-        preCachedData = batchData;
-      }
-    }
-
-    private void calcFromStatistics(Statistics statistics)
-        throws QueryProcessException {
-      for (Pair<AggregateResult, Integer> result : results) {
-        //cacl is compile
-        if (result.left.isCalculatedAggregationResult()) {
-          continue;
-        }
-        result.left.updateResultFromStatistics(statistics);
-      }
-    }
-
-    // clear all results
-    private void resetAggregateResults() {
-      for (Pair<AggregateResult, Integer> result : results) {
-        result.left.reset();
-      }
-    }
-
-
-    private boolean readAndCalcFromPage() throws IOException, 
QueryProcessException {
-      while (reader.hasNextPage()) {
-        Statistics pageStatistics = reader.currentPageStatistics();
-        //must be non overlapped page
-        if (pageStatistics != null) {
-          //current page max than time range
-          if (pageStatistics.getStartTime() >= curEndTime) {
-            return true;
-          }
-          //can use pageHeader
-          if (reader.canUseCurrentPageStatistics() && timeRange.contains(
-              new TimeRange(pageStatistics.getStartTime(), 
pageStatistics.getEndTime()))) {
-            calcFromStatistics(pageStatistics);
-            reader.skipCurrentPage();
-            if (isEndCalc()) {
-              return true;
-            }
-            continue;
-          }
-        }
-        // calc from page data
-        BatchData batchData = reader.nextPage();
-        if (batchData == null || !batchData.hasCurrent()) {
-          continue;
-        }
-        // stop calc and cached current batchData
-        if (batchData.currentTime() >= curEndTime) {
-          preCachedData = batchData;
-          return true;
-        }
-
-        calcFromBatch(batchData);
-        if (isEndCalc() || batchData.currentTime() >= curEndTime) {
-          return true;
-        }
-      }
-      return false;
-    }
+  protected GroupByExecutor getGroupByExecutor(Path path,
+      TSDataType dataType,
+      QueryContext context, Filter timeFilter, TsFileFilter fileFilter)
+      throws StorageEngineException {
+    return new LocalGroupByExecutor(path, dataType, context, timeFilter, 
fileFilter);
   }
-
 }
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
new file mode 100644
index 0000000..a009266
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
@@ -0,0 +1,187 @@
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.IAggregateReader;
+import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+public class LocalGroupByExecutor implements GroupByExecutor {
+
+  private IAggregateReader reader;
+  private BatchData preCachedData;
+  //<aggFunction - indexForRecord> of path
+  private List<Pair<AggregateResult, Integer>> results = new ArrayList<>();
+  private TimeRange timeRange;
+
+  public LocalGroupByExecutor(Path path, TSDataType dataType, QueryContext 
context, Filter timeFilter,
+      TsFileFilter fileFilter)
+      throws StorageEngineException {
+    QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+        .getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+    this.reader = new SeriesAggregateReader(path, dataType, context, 
queryDataSource, timeFilter,
+        null, fileFilter);
+    this.preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult, int index) {
+    results.add(new Pair<>(aggrResult, index));
+  }
+
+  private boolean isEndCalc() {
+    for (Pair<AggregateResult, Integer> result : results) {
+      if (!result.left.isCalculatedAggregationResult()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean calcFromCacheData(long curStartTime, long curEndTime) throws 
IOException {
+    calcFromBatch(preCachedData, curStartTime, curEndTime);
+    // The result is calculated from the cache
+    return (preCachedData != null && preCachedData.getMaxTimestamp() >= 
curEndTime)
+        || isEndCalc();
+  }
+
+  private void calcFromBatch(BatchData batchData, long curStartTime, long 
curEndTime) throws IOException {
+    // is error data
+    if (batchData == null
+        || !batchData.hasCurrent()
+        || batchData.getMaxTimestamp() < curStartTime
+        || batchData.currentTime() >= curEndTime) {
+      return;
+    }
+
+    for (Pair<AggregateResult, Integer> result : results) {
+      //current agg method has been calculated
+      if (result.left.isCalculatedAggregationResult()) {
+        continue;
+      }
+      //lazy reset batch data for calculation
+      batchData.resetBatchData();
+      //skip points that cannot be calculated
+      while (batchData.currentTime() < curStartTime && batchData.hasCurrent()) 
{
+        batchData.next();
+      }
+      if (batchData.hasCurrent()) {
+        result.left.updateResultFromPageData(batchData, curEndTime);
+      }
+    }
+    //can calc for next interval
+    if (batchData.getMaxTimestamp() >= curEndTime) {
+      preCachedData = batchData;
+    }
+  }
+
+  private void calcFromStatistics(Statistics pageStatistics)
+      throws QueryProcessException {
+    for (Pair<AggregateResult, Integer> result : results) {
+      //cacl is compile
+      if (result.left.isCalculatedAggregationResult()) {
+        continue;
+      }
+      result.left.updateResultFromStatistics(pageStatistics);
+    }
+  }
+
+  @Override
+  public List<Pair<AggregateResult, Integer>> calcResult(long curStartTime, 
long curEndTime)
+      throws IOException, QueryProcessException {
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    //read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    //read chunk finally
+    while (reader.hasNextChunk()) {
+      Statistics chunkStatistics = reader.currentChunkStatistics();
+      if (chunkStatistics.getStartTime() >= curEndTime) {
+        return results;
+      }
+      //calc from chunkMetaData
+      if (reader.canUseCurrentChunkStatistics()
+          && timeRange.contains(chunkStatistics.getStartTime(), 
chunkStatistics.getEndTime())) {
+        calcFromStatistics(chunkStatistics);
+        reader.skipCurrentChunk();
+        continue;
+      }
+      if (readAndCalcFromPage(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+    return results;
+  }
+
+  // clear all results
+  @Override
+  public void resetAggregateResults() {
+    for (Pair<AggregateResult, Integer> result : results) {
+      result.left.reset();
+    }
+  }
+
+
+  private boolean readAndCalcFromPage(long curStartTime, long curEndTime) 
throws IOException,
+      QueryProcessException {
+    while (reader.hasNextPage()) {
+      Statistics pageStatistics = reader.currentPageStatistics();
+      //must be non overlapped page
+      if (pageStatistics != null) {
+        //current page max than time range
+        if (pageStatistics.getStartTime() >= curEndTime) {
+          return true;
+        }
+        //can use pageHeader
+        if (reader.canUseCurrentPageStatistics()
+            && timeRange.contains(pageStatistics.getStartTime(), 
pageStatistics.getEndTime())) {
+          calcFromStatistics(pageStatistics);
+          reader.skipCurrentPage();
+          if (isEndCalc()) {
+            return true;
+          }
+          continue;
+        }
+      }
+      // calc from page data
+      BatchData batchData = reader.nextPage();
+      if (batchData == null || !batchData.hasCurrent()) {
+        continue;
+      }
+      // stop calc and cached current batchData
+      if (batchData.currentTime() >= curEndTime) {
+        preCachedData = batchData;
+        return true;
+      }
+
+      calcFromBatch(batchData, curStartTime, curEndTime);
+      if (isEndCalc() || batchData.currentTime() >= curEndTime) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 6882e7b..1e63f5c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -57,7 +57,7 @@ public class AggregationExecutor {
 
   private List<Path> selectedSeries;
   protected List<TSDataType> dataTypes;
-  private List<String> aggregations;
+  protected List<String> aggregations;
   protected IExpression expression;
 
   /**
@@ -65,7 +65,7 @@ public class AggregationExecutor {
    **/
   private int aggregateFetchSize;
 
-  AggregationExecutor(AggregationPlan aggregationPlan) {
+  protected AggregationExecutor(AggregationPlan aggregationPlan) {
     this.selectedSeries = aggregationPlan.getDeduplicatedPaths();
     this.dataTypes = aggregationPlan.getDeduplicatedDataTypes();
     this.aggregations = aggregationPlan.getDeduplicatedAggregations();
@@ -78,7 +78,7 @@ public class AggregationExecutor {
    *
    * @param context query context
    */
-  QueryDataSet executeWithoutValueFilter(QueryContext context)
+  public QueryDataSet executeWithoutValueFilter(QueryContext context)
       throws StorageEngineException, IOException, QueryProcessException {
 
     Filter timeFilter = null;
@@ -109,7 +109,7 @@ public class AggregationExecutor {
    * @param context query context
    * @return AggregateResult list
    */
-  private List<AggregateResult> aggregateOneSeries(
+  protected List<AggregateResult> aggregateOneSeries(
       Map.Entry<Path, List<Integer>> pathToAggrIndexes,
       Filter timeFilter, QueryContext context)
       throws IOException, QueryProcessException, StorageEngineException {
@@ -128,7 +128,7 @@ public class AggregationExecutor {
     return aggregateResultList;
   }
 
-  private static void aggregateOneSeries(Path seriesPath, QueryContext 
context, Filter timeFilter,
+  public static void aggregateOneSeries(Path seriesPath, QueryContext context, 
Filter timeFilter,
       TSDataType tsDataType, List<AggregateResult> aggregateResultList, 
TsFileFilter fileFilter)
       throws StorageEngineException, IOException, QueryProcessException {
 
@@ -227,7 +227,7 @@ public class AggregationExecutor {
    *
    * @param context query context.
    */
-  QueryDataSet executeWithValueFilter(QueryContext context)
+  public QueryDataSet executeWithValueFilter(QueryContext context)
       throws StorageEngineException, IOException {
 
     TimeGenerator timestampGenerator = getTimeGenerator(context);
@@ -249,11 +249,11 @@ public class AggregationExecutor {
     return constructDataSet(aggregateResults);
   }
 
-  private TimeGenerator getTimeGenerator(QueryContext context) throws 
StorageEngineException {
+  protected TimeGenerator getTimeGenerator(QueryContext context) throws 
StorageEngineException {
     return new ServerTimeGenerator(expression, context);
   }
 
-  private IReaderByTimestamp getReaderByTime(Path path, TSDataType dataType,
+  protected IReaderByTimestamp getReaderByTime(Path path, TSDataType dataType,
       QueryContext context) throws StorageEngineException {
     return new SeriesReaderByTimestamp(path,
         dataType, context,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index a14742d..08cdde1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -88,9 +88,7 @@ public class FillQueryExecutor {
       } else {
         fill = typeIFillMap.get(dataType).copy();
       }
-      fill.setDataType(dataType);
-      fill.setQueryTime(queryTime);
-      fill.constructReaders(path, context);
+      configureFill(fill, dataType, path, context, queryTime);
 
       TimeValuePair timeValuePair = fill.getFillResult();
       if (timeValuePair == null || timeValuePair.getValue() == null) {
@@ -104,4 +102,11 @@ public class FillQueryExecutor {
     dataSet.setRecord(record);
     return dataSet;
   }
+
+  protected void configureFill(IFill fill, TSDataType dataType, Path path, 
QueryContext context,
+      long queryTime) throws StorageEngineException {
+    fill.setDataType(dataType);
+    fill.setQueryTime(queryTime);
+    fill.constructReaders(path, context);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java 
b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 9807933..6ed625e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -138,12 +138,22 @@ public class QueryRouter implements IQueryRouter {
     groupByPlan.setExpression(optimizedExpression);
 
     if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
-      return new GroupByWithoutValueFilterDataSet(context, groupByPlan);
+      return getGroupByWithoutValueFilterDataSet(context, groupByPlan);
     } else {
-      return new GroupByWithValueFilterDataSet(context, groupByPlan);
+      return getGroupByWithValueFilterDataSet(context, groupByPlan);
     }
   }
 
+  protected GroupByWithoutValueFilterDataSet 
getGroupByWithoutValueFilterDataSet(QueryContext context, GroupByPlan plan)
+      throws StorageEngineException {
+    return new GroupByWithoutValueFilterDataSet(context, plan);
+  }
+
+  protected GroupByWithValueFilterDataSet 
getGroupByWithValueFilterDataSet(QueryContext context, GroupByPlan plan)
+      throws StorageEngineException {
+    return new GroupByWithValueFilterDataSet(context, plan);
+  }
+
   @Override
   public QueryDataSet fill(FillQueryPlan fillQueryPlan, QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException {
@@ -152,11 +162,18 @@ public class QueryRouter implements IQueryRouter {
     long queryTime = fillQueryPlan.getQueryTime();
     Map<TSDataType, IFill> fillType = fillQueryPlan.getFillType();
 
-    FillQueryExecutor fillQueryExecutor = new FillQueryExecutor(fillPaths, 
dataTypes, queryTime,
+    FillQueryExecutor fillQueryExecutor = getFillExecutor(fillPaths, 
dataTypes, queryTime,
         fillType);
     return fillQueryExecutor.execute(context);
   }
 
+  protected FillQueryExecutor getFillExecutor(
+      List<Path> fillPaths,
+      List<TSDataType> dataTypes, long queryTime,
+      Map<TSDataType, IFill> fillType) {
+    return new FillQueryExecutor(fillPaths, dataTypes, queryTime, fillType);
+  }
+
   @Override
   public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext 
context)
           throws StorageEngineException, QueryProcessException, IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java 
b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index f1d9a21..d8eb77b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -57,6 +57,14 @@ public abstract class IFill {
         timeFilter, null, null);
   }
 
+  public void setAllDataReader(IBatchReader allDataReader) {
+    this.allDataReader = allDataReader;
+  }
+
+  public Filter getFilter() {
+    return constructFilter();
+  }
+
   public abstract TimeValuePair getFillResult() throws IOException, 
UnSupportedFillTypeException;
 
   public TSDataType getDataType() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 1d7825d..ef27d37 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -45,7 +45,7 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import java.io.IOException;
 import java.util.*;
 
-class SeriesReader {
+public class SeriesReader {
 
   private final Path seriesPath;
   private final TSDataType dataType;
@@ -94,7 +94,7 @@ class SeriesReader {
   private boolean hasCachedNextOverlappedPage;
   private BatchData cachedBatchData;
 
-  SeriesReader(Path seriesPath, TSDataType dataType, QueryContext context,
+  public SeriesReader(Path seriesPath, TSDataType dataType, QueryContext 
context,
       QueryDataSource dataSource, Filter timeFilter, Filter valueFilter, 
TsFileFilter fileFilter) {
     this.seriesPath = seriesPath;
     this.dataType = dataType;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 3ad965c..803bdf5 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -239,10 +239,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     for (long statementId : statementIds) {
       Set<Long> queryIds = statementId2QueryId.getOrDefault(statementId, 
Collections.emptySet());
       for (long queryId : queryIds) {
-        queryId2DataSet.remove(queryId);
-
         try {
-          QueryResourceManager.getInstance().endQuery(queryId);
+          releaseQueryResource(queryId);
         } catch (StorageEngineException e) {
           // release as many as resources as possible, so do not break as soon 
as one exception is
           // raised
@@ -300,7 +298,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   /**
    * release single operation resource
    */
-  private void releaseQueryResource(long queryId) throws 
StorageEngineException {
+  protected void releaseQueryResource(long queryId) throws 
StorageEngineException {
     // remove the corresponding Physical Plan
     queryId2DataSet.remove(queryId);
     QueryResourceManager.getInstance().endQuery(queryId);
@@ -778,6 +776,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of 
ALIGN_BY_DEVICE result
     List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
     deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of 
ALIGN_BY_DEVICE result
+
     Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
     Map<String, TSDataType> checker = plan.getMeasurementDataTypeMap();
 
@@ -812,7 +811,6 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     plan.setPaths(null);
   }
 
-
   @Override
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
     try {
@@ -831,8 +829,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
             fillRpcReturnData(req.fetchSize, queryDataSet, 
sessionIdUsernameMap.get(req.sessionId));
         boolean hasResultSet = result.bufferForTime().limit() != 0;
         if (!hasResultSet) {
-          QueryResourceManager.getInstance().endQuery(req.queryId);
-          queryId2DataSet.remove(req.queryId);
+          releaseQueryResource(req.queryId);
         }
         TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
         resp.setHasResultSet(hasResultSet);
@@ -942,7 +939,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     return queryDataSet;
   }
 
-  private QueryContext genQueryContext(long queryId) {
+  protected QueryContext genQueryContext(long queryId) {
     return new QueryContext(queryId);
   }
 
@@ -1019,7 +1016,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     return AuthorityChecker.check(username, paths, plan.getOperatorType(), 
targetUser);
   }
 
-  void handleClientExit() {
+  protected void handleClientExit() {
     Long sessionId = currSessionId.get();
     if (sessionId != null) {
       TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
@@ -1299,7 +1296,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     return null;
   }
 
-  private TSStatus executePlan(PhysicalPlan plan) {
+  protected TSStatus executePlan(PhysicalPlan plan) {
     boolean execRet;
     try {
       execRet = executeNonQuery(plan);
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index 4109382..d89e6fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -19,9 +19,12 @@
 package org.apache.iotdb.db.utils;
 
 import java.io.File;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 
 public class FilePathUtils {
 
+  private static final String PATH_SPLIT_STRING = File.separator.equals("\\") 
? "\\\\" : "/";
+
   private FilePathUtils() {
     // forbidding instantiation
   }
@@ -39,4 +42,7 @@ public class FilePathUtils {
     return filePath;
   }
 
+  public static String[] splitTsFilePath(TsFileResource resource) {
+    return resource.getFile().getAbsolutePath().split(PATH_SPLIT_STRING);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java 
b/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java
index 015a094..ffa3526 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/ConcatOptimizerTest.java
@@ -34,6 +34,8 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 import org.junit.After;
@@ -97,4 +99,24 @@ public class ConcatOptimizerTest {
         ValueFilter.lt(10));
     assertEquals(seriesExpression.toString(), ((RawDataQueryPlan) 
plan).getExpression().toString());
   }
+
+  @Test
+  public void testConcatMultipleDeviceInFilter() throws QueryProcessException {
+    String inputSQL = "select s1 from root.laptop.* where s1 < 10";
+    PhysicalPlan plan = processor.parseSQLToPhysicalPlan(inputSQL);
+    IExpression expression = BinaryExpression.and(
+        BinaryExpression.and(
+            new SingleSeriesExpression(
+                new Path("root.laptop.d1.s1"),
+                ValueFilter.lt(10)),
+            new SingleSeriesExpression(
+                new Path("root.laptop.d2.s1"),
+                ValueFilter.lt(10))
+        ),
+        new SingleSeriesExpression(
+            new Path("root.laptop.d3.s1"),
+            ValueFilter.lt(10))
+    );
+    assertEquals(expression.toString(), ((RawDataQueryPlan) 
plan).getExpression().toString());
+  }
 }
\ No newline at end of file
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index f976d0f..bebae01 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -33,6 +33,9 @@ public abstract class QueryDataSet {
   protected int rowOffset = 0;
   protected int alreadyReturnedRowNum = 0;
 
+  public QueryDataSet() {
+  }
+
   public QueryDataSet(List<Path> paths, List<TSDataType> dataTypes) {
     this.paths = paths;
     this.dataTypes = dataTypes;

Reply via email to