This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 167a30e2f30d89dc91670a731fbb7fadac161a27
Author: lta <[email protected]>
AuthorDate: Thu May 16 14:57:41 2019 +0800

    add fill feature
---
 .../query/executor/ClusterFillEngineExecutor.java  |  66 ++--
 .../cluster/query/executor/ClusterQueryRouter.java |  13 +-
 .../query/manager/common/FillBatchData.java        |  65 ++++
 .../querynode/ClusterLocalSingleQueryManager.java  |  86 +++--
 .../AbstractClusterPointReader.java                |   5 +-
 ...=> AbstractClusterSelectSeriesBatchReader.java} |   2 +-
 ...ava => ClusterFillSelectSeriesBatchReader.java} |  26 +-
 ...or.java => ClusterSelectSeriesBatchReader.java} |  13 +-
 ...ClusterSelectSeriesBatchReaderByTimestamp.java} |   7 +-
 .../query/utils/ClusterTimeValuePairUtils.java     |  26 ++
 .../query/utils/QueryPlanPartitionUtils.java       |  41 ++-
 .../cluster/integration/IoTDBFillQueryIT.java      | 366 +++++++++++++++++++++
 .../IoTDBQueryIT.java}                             |   4 +-
 .../IoTDBQueryLargeDataIT.java}                    |   4 +-
 .../query/manager/ClusterLocalManagerTest.java     |  66 ++--
 .../db/query/executor/FillEngineExecutor.java      |  11 +-
 .../db/query/executor/IFillEngineExecutor.java     |  23 +-
 .../java/org/apache/iotdb/db/query/fill/IFill.java |  14 +-
 .../org/apache/iotdb/db/query/fill/LinearFill.java |   4 +-
 .../apache/iotdb/db/query/fill/PreviousFill.java   |   6 +-
 20 files changed, 701 insertions(+), 147 deletions(-)

diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
similarity index 57%
copy from 
iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
index 83c5fa9..771637e 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterFillEngineExecutor.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-package org.apache.iotdb.db.query.executor;
+package org.apache.iotdb.cluster.query.executor;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
+import 
org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -30,6 +31,7 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
+import org.apache.iotdb.db.query.executor.IFillEngineExecutor;
 import org.apache.iotdb.db.query.fill.IFill;
 import org.apache.iotdb.db.query.fill.PreviousFill;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -37,55 +39,57 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
-public class FillEngineExecutor {
+public class ClusterFillEngineExecutor implements IFillEngineExecutor {
 
-  private long jobId;
   private List<Path> selectedSeries;
   private long queryTime;
   private Map<TSDataType, IFill> typeIFillMap;
+  private ClusterRpcSingleQueryManager queryManager;
+
 
-  public FillEngineExecutor(long jobId, List<Path> selectedSeries, long 
queryTime,
-      Map<TSDataType, IFill> typeIFillMap) {
-    this.jobId = jobId;
+  public ClusterFillEngineExecutor(List<Path> selectedSeries, long queryTime,
+      Map<TSDataType, IFill> typeIFillMap, ClusterRpcSingleQueryManager 
queryManager) {
     this.selectedSeries = selectedSeries;
     this.queryTime = queryTime;
     this.typeIFillMap = typeIFillMap;
+    this.queryManager = queryManager;
   }
 
-  /**
-   * execute fill.
-   *
-   * @param context query context
-   */
+  @Override
   public QueryDataSet execute(QueryContext context)
       throws FileNodeManagerException, PathErrorException, IOException {
-    QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
-
+    Map<Path, ClusterSelectSeriesReader> selectPathReaders = 
queryManager.getSelectSeriesReaders();
+    List<Path> paths = new ArrayList<>();
     List<IFill> fillList = new ArrayList<>();
     List<TSDataType> dataTypeList = new ArrayList<>();
+    List<IPointReader> readers = new ArrayList<>();
     for (Path path : selectedSeries) {
-      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
-          .getQueryDataSource(path, context);
-      TSDataType dataType = 
MManager.getInstance().getSeriesType(path.getFullPath());
-      dataTypeList.add(dataType);
-      IFill fill = null;
-      if (!typeIFillMap.containsKey(dataType)) {
-        fill = new PreviousFill(dataType, queryTime, 0);
+      if (selectPathReaders.containsKey(path)) {
+        ClusterSelectSeriesReader reader = selectPathReaders.get(path);
+        readers.add(reader);
+        dataTypeList.add(reader.getDataType());
       } else {
-        fill = typeIFillMap.get(dataType).copy(path);
+        QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+            .getQueryDataSource(path, context);
+        TSDataType dataType = 
MManager.getInstance().getSeriesType(path.getFullPath());
+        dataTypeList.add(dataType);
+        IFill fill;
+        if (!typeIFillMap.containsKey(dataType)) {
+          fill = new PreviousFill(dataType, queryTime, 0);
+        } else {
+          fill = typeIFillMap.get(dataType).copy(path);
+        }
+        fill.setDataType(dataType);
+        fill.setQueryTime(queryTime);
+        fill.constructReaders(queryDataSource, context);
+        fillList.add(fill);
+        readers.add(fill.getFillResult());
       }
-      fill.setDataType(dataType);
-      fill.setQueryTime(queryTime);
-      fill.constructReaders(queryDataSource, context);
-      fillList.add(fill);
     }
 
-    List<IPointReader> readers = new ArrayList<>();
-    for (IFill fill : fillList) {
-      readers.add(fill.getFillResult());
-    }
+    QueryResourceManager.getInstance()
+        .beginQueryOfGivenQueryPaths(context.getJobId(), paths);
 
     return new EngineDataSetWithoutTimeGenerator(selectedSeries, dataTypeList, 
readers);
   }
-
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index 4211528..2fa4576 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.executor.FillEngineExecutor;
 import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
 import org.apache.iotdb.db.query.fill.IFill;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -108,7 +109,17 @@ public class ClusterQueryRouter implements 
IEngineQueryRouter {
   @Override
   public QueryDataSet fill(List<Path> fillPaths, long queryTime, 
Map<TSDataType, IFill> fillType,
       QueryContext context) throws FileNodeManagerException, 
PathErrorException, IOException {
-    throw new UnsupportedOperationException();
+    ClusterRpcSingleQueryManager queryManager = 
ClusterRpcQueryManager.getInstance()
+        .getSingleQuery(context.getJobId());
+    try {
+      queryManager.initQueryResource(QueryType.NO_FILTER, 
getReadDataConsistencyLevel());
+
+      ClusterFillEngineExecutor fillEngineExecutor = new 
ClusterFillEngineExecutor(fillPaths, queryTime,
+          fillType, queryManager);
+      return fillEngineExecutor.execute(context);
+    } catch (IOException | RaftConnectionException e) {
+      throw new FileNodeManagerException(e);
+    }
   }
 
   public int getReadDataConsistencyLevel() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java
new file mode 100644
index 0000000..3e128e3
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/common/FillBatchData.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.query.manager.common;
+
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+/**
+ * <code>FillBatchData</code> is a self-defined data structure which is used 
in cluster query
+ * process of fill type, which only contains one TimeValuePair and value can 
be null.
+ */
+public class FillBatchData extends BatchData {
+
+  private TimeValuePair timeValuePair;
+  private boolean isUsed;
+
+  public FillBatchData(TimeValuePair timeValuePair, boolean isUsed) {
+    this.timeValuePair = timeValuePair;
+    this.isUsed = isUsed;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !isUsed;
+  }
+
+  @Override
+  public void next() {
+    isUsed = true;
+  }
+
+  @Override
+  public long currentTime() {
+    return timeValuePair.getTimestamp();
+  }
+
+  @Override
+  public Object currentValue() {
+    if (!isUsed) {
+      return timeValuePair.getValue() == null ? null : 
timeValuePair.getValue().getValue();
+    } else {
+      return null;
+    }
+  }
+
+  public TimeValuePair getTimeValuePair() {
+    return isUsed ? null : timeValuePair;
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
index 37e3c57..f776477 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalSingleQueryManager.java
@@ -28,9 +28,10 @@ import 
org.apache.iotdb.cluster.concurrent.pool.QueryTimerManager;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.query.PathType;
 import org.apache.iotdb.cluster.query.factory.ClusterSeriesReaderFactory;
-import 
org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterBatchReader;
-import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderByTimestamp;
-import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderWithoutTimeGenerator;
+import 
org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
+import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterFillSelectSeriesBatchReader;
+import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
+import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
 import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
 import 
org.apache.iotdb.cluster.query.reader.querynode.IClusterFilterSeriesBatchReader;
 import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
@@ -39,6 +40,7 @@ import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataReques
 import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.InitSeriesReaderResponse;
 import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse;
 import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -46,11 +48,14 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.executor.AbstractExecutorWithoutTimeGenerator;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.fill.PreviousFill;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.merge.EngineReaderByTimeStamp;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -89,7 +94,7 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
   /**
    * Key is series full path, value is reader of select series
    */
-  private Map<String, AbstractClusterBatchReader> selectSeriesReaders = new 
HashMap<>();
+  private Map<String, AbstractClusterSelectSeriesBatchReader> 
selectSeriesReaders = new HashMap<>();
 
   /**
    * Filter reader
@@ -130,6 +135,8 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
         throw new UnsupportedOperationException();
       } else if (plan instanceof AggregationPlan) {
         throw new UnsupportedOperationException();
+      } else if (plan instanceof FillQueryPlan) {
+        handleFillSeriesRerader(plan, context, response);
       } else {
         if (plan.getExpression() == null
             || plan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
@@ -147,22 +154,40 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
   }
 
   /**
-   * Handle filter series reader
+   * Handle fill series reader
    *
-   * @param plan filter series query plan
+   * @param queryPlan fill query plan
    */
-  private void handleFilterSeriesReader(QueryPlan plan, QueryContext context,
-      InitSeriesReaderRequest request, InitSeriesReaderResponse response, 
PathType pathType)
-      throws PathErrorException, QueryFilterOptimizationException, 
FileNodeManagerException, ProcessorException, IOException {
-    QueryDataSet queryDataSet = queryProcessExecutor
-        .processQuery(plan, context);
-    List<Path> paths = plan.getPaths();
-    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
-    for (int i = 0; i < paths.size(); i++) {
-      dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i));
+  private void handleFillSeriesRerader(QueryPlan queryPlan, QueryContext 
context,
+      InitSeriesReaderResponse response)
+      throws FileNodeManagerException, PathErrorException, IOException {
+    FillQueryPlan fillQueryPlan = (FillQueryPlan) queryPlan;
+
+    List<Path> selectedPaths = queryPlan.getPaths();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedPaths);
+
+    Map<TSDataType, IFill> typeIFillMap = fillQueryPlan.getFillType();
+    for (Path path : selectedPaths) {
+      QueryDataSource queryDataSource = QueryResourceManager.getInstance()
+          .getQueryDataSource(path, context);
+      TSDataType dataType = 
MManager.getInstance().getSeriesType(path.getFullPath());
+      dataTypes.add(dataType);
+      IFill fill;
+      if (!typeIFillMap.containsKey(dataType)) {
+        fill = new PreviousFill(dataType, fillQueryPlan.getQueryTime(), 0);
+      } else {
+        fill = typeIFillMap.get(dataType).copy(path);
+      }
+      fill.setDataType(dataType);
+      fill.setQueryTime(fillQueryPlan.getQueryTime());
+      fill.constructReaders(queryDataSource, context);
+      selectSeriesReaders.put(path.getFullPath(),
+          new ClusterFillSelectSeriesBatchReader(dataType, 
fill.getFillResult()));
+      dataTypeMap.put(path.getFullPath(), dataType);
     }
-    response.getSeriesDataTypes().put(pathType, dataTypes);
-    filterReader = new ClusterFilterSeriesBatchReader(queryDataSet, paths, 
request.getFilterList());
+
+    response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
   }
 
   /**
@@ -188,13 +213,32 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
       IPointReader reader = AbstractExecutorWithoutTimeGenerator
           .createSeriesReader(context, paths.get(i), dataTypes, timeFilter);
       selectSeriesReaders
-          .put(fullPath, new 
ClusterBatchReaderWithoutTimeGenerator(dataTypes.get(i), reader));
+          .put(fullPath, new ClusterSelectSeriesBatchReader(dataTypes.get(i), 
reader));
       dataTypeMap.put(fullPath, dataTypes.get(i));
     }
     response.getSeriesDataTypes().put(PathType.SELECT_PATH, dataTypes);
   }
 
   /**
+   * Handle filter series reader
+   *
+   * @param plan filter series query plan
+   */
+  private void handleFilterSeriesReader(QueryPlan plan, QueryContext context,
+      InitSeriesReaderRequest request, InitSeriesReaderResponse response, 
PathType pathType)
+      throws PathErrorException, QueryFilterOptimizationException, 
FileNodeManagerException, ProcessorException, IOException {
+    QueryDataSet queryDataSet = queryProcessExecutor
+        .processQuery(plan, context);
+    List<Path> paths = plan.getPaths();
+    List<TSDataType> dataTypes = queryDataSet.getDataTypes();
+    for (int i = 0; i < paths.size(); i++) {
+      dataTypeMap.put(paths.get(i).getFullPath(), dataTypes.get(i));
+    }
+    response.getSeriesDataTypes().put(pathType, dataTypes);
+    filterReader = new ClusterFilterSeriesBatchReader(queryDataSet, paths, 
request.getFilterList());
+  }
+
+  /**
    * Handle select series query with value filter
    *
    * @param plan plan query plan
@@ -212,7 +256,7 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
           .createReaderByTimeStamp(path, context);
       TSDataType dataType = 
MManager.getInstance().getSeriesType(path.getFullPath());
       selectSeriesReaders
-          .put(path.getFullPath(), new 
ClusterBatchReaderByTimestamp(readerByTimeStamp, dataType));
+          .put(path.getFullPath(), new 
ClusterSelectSeriesBatchReaderByTimestamp(readerByTimeStamp, dataType));
       dataTypeMap.put(path.getFullPath(), dataType);
       dataTypeList.add(dataType);
     }
@@ -253,7 +297,7 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
       this.queryRound = targetQueryRounds;
       List<BatchData> batchDataList = new ArrayList<>();
       for (String series : fetchDataSeries) {
-        AbstractClusterBatchReader reader = selectSeriesReaders.get(series);
+        AbstractClusterSelectSeriesBatchReader reader = 
selectSeriesReaders.get(series);
         batchDataList.add(reader.nextBatch(request.getBatchTimestamp()));
       }
       cachedBatchDataResult = batchDataList;
@@ -309,7 +353,7 @@ public class ClusterLocalSingleQueryManager implements 
IClusterLocalSingleQueryM
     return queryRound;
   }
 
-  public Map<String, AbstractClusterBatchReader> getSelectSeriesReaders() {
+  public Map<String, AbstractClusterSelectSeriesBatchReader> 
getSelectSeriesReaders() {
     return selectSeriesReaders;
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
index 72c7c70..3f73160 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/coordinatornode/AbstractClusterPointReader.java
@@ -20,9 +20,9 @@ package org.apache.iotdb.cluster.query.reader.coordinatornode;
 
 import java.io.IOException;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.query.utils.ClusterTimeValuePairUtils;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.utils.TimeValuePairUtils;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
 /**
@@ -63,7 +63,8 @@ public abstract class AbstractClusterPointReader implements 
IPointReader {
   @Override
   public TimeValuePair next() throws IOException {
     if (hasNext()) {
-      TimeValuePair timeValuePair = 
TimeValuePairUtils.getCurrentTimeValuePair(currentBatchData);
+      TimeValuePair timeValuePair = ClusterTimeValuePairUtils
+          .getCurrentTimeValuePair(currentBatchData);
       currentTimeValuePair = timeValuePair;
       currentBatchData.next();
       return timeValuePair;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java
similarity index 93%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java
index b0a86bd..6fe28e2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterSelectSeriesBatchReader.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 /**
  * Cluster batch reader, which provides another method to get batch data by 
batch timestamp.
  */
-public abstract class AbstractClusterBatchReader implements IBatchReader {
+public abstract class AbstractClusterSelectSeriesBatchReader implements 
IBatchReader {
 
   /**
    * Get batch data by batch time
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
similarity index 62%
copy from 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
copy to 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
index b0a86bd..55639a1 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterFillSelectSeriesBatchReader.java
@@ -19,21 +19,21 @@
 package org.apache.iotdb.cluster.query.reader.querynode;
 
 import java.io.IOException;
-import java.util.List;
-import org.apache.iotdb.db.query.reader.IBatchReader;
+import org.apache.iotdb.cluster.query.manager.common.FillBatchData;
+import org.apache.iotdb.db.query.reader.IPointReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 
-/**
- * Cluster batch reader, which provides another method to get batch data by 
batch timestamp.
- */
-public abstract class AbstractClusterBatchReader implements IBatchReader {
+public class ClusterFillSelectSeriesBatchReader extends 
ClusterSelectSeriesBatchReader {
 
-  /**
-   * Get batch data by batch time
-   *
-   * @param batchTime valid batch timestamp
-   * @return corresponding batch data
-   */
-  public abstract BatchData nextBatch(List<Long> batchTime) throws IOException;
+  public ClusterFillSelectSeriesBatchReader(
+      TSDataType dataType,
+      IPointReader reader) {
+    super(dataType, reader);
+  }
 
+  @Override
+  public BatchData nextBatch() throws IOException {
+    return hasNext() ? new FillBatchData(reader.next(), false) : new 
FillBatchData(null, true);
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderWithoutTimeGenerator.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
similarity index 85%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderWithoutTimeGenerator.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
index a59c79c..cbbad2e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderWithoutTimeGenerator.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReader.java
@@ -30,21 +30,22 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 /**
  * BatchReader without time generator for cluster which is used in query node.
  */
-public class ClusterBatchReaderWithoutTimeGenerator extends 
AbstractClusterBatchReader {
+public class ClusterSelectSeriesBatchReader extends
+    AbstractClusterSelectSeriesBatchReader {
 
   /**
    * Data type
    */
-  private TSDataType dataType;
+  protected TSDataType dataType;
 
   /**
    * Point reader
    */
-  private IPointReader reader;
+  protected IPointReader reader;
 
-  private static final ClusterConfig CLUSTER_CONF = 
ClusterDescriptor.getInstance().getConfig();
+  static final ClusterConfig CLUSTER_CONF = 
ClusterDescriptor.getInstance().getConfig();
 
-  public ClusterBatchReaderWithoutTimeGenerator(
+  public ClusterSelectSeriesBatchReader(
       TSDataType dataType, IPointReader reader) {
     this.dataType = dataType;
     this.reader = reader;
@@ -80,7 +81,7 @@ public class ClusterBatchReaderWithoutTimeGenerator extends 
AbstractClusterBatch
   @Override
   public BatchData nextBatch(List<Long> batchTime) throws IOException {
     throw new IOException(
-        "nextBatch(List<Long> batchTime) in 
ClusterBatchReaderWithoutTimeGenerator is an empty method.");
+        "nextBatch(List<Long> batchTime) in ClusterSelectSeriesBatchReader is 
an empty method.");
   }
 
   public TSDataType getDataType() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderByTimestamp.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java
similarity index 90%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderByTimestamp.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java
index b8c36eb..72dce05 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterBatchReaderByTimestamp.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/ClusterSelectSeriesBatchReaderByTimestamp.java
@@ -27,7 +27,8 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
 /**
  * BatchReader by timestamp for cluster which is used in query node.
  */
-public class ClusterBatchReaderByTimestamp extends AbstractClusterBatchReader {
+public class ClusterSelectSeriesBatchReaderByTimestamp extends
+    AbstractClusterSelectSeriesBatchReader {
 
   /**
    * Reader
@@ -39,7 +40,7 @@ public class ClusterBatchReaderByTimestamp extends 
AbstractClusterBatchReader {
    */
   private TSDataType dataType;
 
-  public ClusterBatchReaderByTimestamp(
+  public ClusterSelectSeriesBatchReaderByTimestamp(
       EngineReaderByTimeStamp readerByTimeStamp,
       TSDataType dataType) {
     this.readerByTimeStamp = readerByTimeStamp;
@@ -54,7 +55,7 @@ public class ClusterBatchReaderByTimestamp extends 
AbstractClusterBatchReader {
   @Override
   public BatchData nextBatch() throws IOException {
     throw new UnsupportedOperationException(
-        "nextBatch() in ClusterBatchReaderByTimestamp is an empty method.");
+        "nextBatch() in ClusterSelectSeriesBatchReaderByTimestamp is an empty 
method.");
   }
 
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
new file mode 100644
index 0000000..a0ee256
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterTimeValuePairUtils.java
@@ -0,0 +1,26 @@
+package org.apache.iotdb.cluster.query.utils;
+
+import org.apache.iotdb.cluster.query.manager.common.FillBatchData;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.utils.TimeValuePairUtils;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+
+public class ClusterTimeValuePairUtils {
+
+  private ClusterTimeValuePairUtils() {
+  }
+
+  /**
+   * get given data's current (time,value) pair.
+   *
+   * @param data -batch data
+   * @return -given data's (time,value) pair
+   */
+  public static TimeValuePair getCurrentTimeValuePair(BatchData data) {
+    if (data instanceof FillBatchData){
+      return ((FillBatchData)data).getTimeValuePair();
+    }else{
+      return TimeValuePairUtils.getCurrentTimeValuePair(data);
+    }
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
index be762e1..546282a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/QueryPlanPartitionUtils.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.cluster.query.utils;
 
 import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -27,8 +29,11 @@ import 
org.apache.iotdb.cluster.query.manager.coordinatornode.FilterGroupEntity;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.ExpressionType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
@@ -43,12 +48,17 @@ public class QueryPlanPartitionUtils {
   }
 
   /**
-   * Split query plan with no filter or with only global time filter by group 
id
+   * Split query plan with no filter, with only global time filter by group id 
or fill query
    */
   public static void splitQueryPlanWithoutValueFilter(
       ClusterRpcSingleQueryManager singleQueryManager)
       throws PathErrorException {
-    splitQueryPlanBySelectPath(singleQueryManager);
+    QueryPlan queryPLan = singleQueryManager.getOriginQueryPlan();
+    if (queryPLan instanceof FillQueryPlan) {
+      splitFillPlan((FillQueryPlan)queryPLan, singleQueryManager);
+    } else {
+      splitQueryPlanBySelectPath(singleQueryManager);
+    }
   }
 
   /**
@@ -93,7 +103,7 @@ public class QueryPlanPartitionUtils {
     }
   }
 
-  private static void splitGroupByPlan(GroupByPlan queryPlan,
+  private static void splitGroupByPlan(GroupByPlan groupByPlan,
       ClusterRpcSingleQueryManager singleQueryManager) {
     throw new UnsupportedOperationException();
   }
@@ -103,6 +113,31 @@ public class QueryPlanPartitionUtils {
     throw new UnsupportedOperationException();
   }
 
+  private static void splitFillPlan(FillQueryPlan fillQueryPlan,
+      ClusterRpcSingleQueryManager singleQueryManager) throws 
PathErrorException {
+    List<Path> selectPaths = fillQueryPlan.getPaths();
+    Map<String, List<Path>> selectSeriesByGroupId = 
singleQueryManager.getSelectSeriesByGroupId();
+    Map<String, QueryPlan> selectPathPlans = 
singleQueryManager.getSelectPathPlans();
+    for (Path path : selectPaths) {
+      String groupId = QPExecutorUtils.getGroupIdByDevice(path.getDevice());
+      if (!selectSeriesByGroupId.containsKey(groupId)) {
+        selectSeriesByGroupId.put(groupId, new ArrayList<>());
+      }
+      selectSeriesByGroupId.get(groupId).add(path);
+    }
+    for (Entry<String, List<Path>> entry : selectSeriesByGroupId.entrySet()) {
+      String groupId = entry.getKey();
+      List<Path> paths = entry.getValue();
+      FillQueryPlan subQueryPlan = new FillQueryPlan();
+      subQueryPlan.setProposer(fillQueryPlan.getProposer());
+      subQueryPlan.setPaths(paths);
+      subQueryPlan.setExpression(fillQueryPlan.getExpression());
+      subQueryPlan.setQueryTime(fillQueryPlan.getQueryTime());
+      subQueryPlan.setFillType(new EnumMap<>(fillQueryPlan.getFillType()));
+      selectPathPlans.put(groupId, subQueryPlan);
+    }
+  }
+
   private static void splitQueryPlan(QueryPlan queryPlan,
       ClusterRpcSingleQueryManager singleQueryManager) throws 
PathErrorException {
     splitQueryPlanBySelectPath(singleQueryManager);
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
new file mode 100644
index 0000000..f5bf17f
--- /dev/null
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBFillQueryIT.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.integration;
+
+import static org.apache.iotdb.cluster.utils.Utils.insertData;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.utils.EnvironmentUtils;
+import org.apache.iotdb.cluster.utils.QPExecutorUtils;
+import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBFillQueryIT {
+
+
+  private Server server;
+  private static final ClusterConfig CLUSTER_CONFIG = 
ClusterDescriptor.getInstance().getConfig();
+  private static final PhysicalNode localNode = new 
PhysicalNode(CLUSTER_CONFIG.getIp(),
+      CLUSTER_CONFIG.getPort());
+
+  private static String[] createSQLs = new String[]{
+      "SET STORAGE GROUP TO root.ln.wf01.wt01",
+      "CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, 
ENCODING=PLAIN",
+      "CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, 
ENCODING=PLAIN"};
+  private static String[] insertSQLs = new String[]{
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(1, 1.1, false, 11)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(2, 2.2, true, 22)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(3, 3.3, false, 33 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(4, 4.4, false, 44)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(5, 5.5, false, 55)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(100, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(150, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(200, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(250, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(300, 500.5, false, 550)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(10, 10.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(20, 20.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(30, 30.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(40, 40.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(50, 50.5, false, 550)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(500, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(510, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(520, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(530, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(540, 500.5, false, 550)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(580, 100.1, false, 110)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(590, 200.2, true, 220)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(600, 300.3, false, 330 )",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(610, 400.4, false, 440)",
+      "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
+          + "values(620, 500.5, false, 550)",
+  };
+
+  private static final String TIMESTAMP_STR = "Time";
+  private static final String TEMPERATURE_STR = 
"root.ln.wf01.wt01.temperature";
+  private static final String STATUS_STR = "root.ln.wf01.wt01.status";
+  private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware";
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.cleanEnv();
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.closeMemControl();
+    CLUSTER_CONFIG.createAllPath();
+    server = Server.getInstance();
+    server.start();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    server.stop();
+    QPExecutorUtils.setLocalNodeAddr(localNode.getIp(), localNode.getPort());
+    EnvironmentUtils.cleanEnv();
+  }
+
+
+  @Test
+  public void LinearFillTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "3,3.3,false,33",
+        "70,70.34,false,374",
+        "70,null,null,null",
+        "625,null,false,null"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select temperature,status, 
hardware from "
+          + "root.ln.wf01.wt01 where time = 3 "
+          + "Fill(int32[linear, 5ms, 5ms], double[linear, 5ms, 5ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        System.out.println(ans);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware "
+          + "from root.ln.wf01.wt01 where time = 70 Fill(int32[linear, 500ms, 
500ms], "
+          + "double[linear, 500ms, 500ms], boolean[previous, 500ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware "
+          + "from root.ln.wf01.wt01 where time = 70 "
+          + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware "
+          + "from root.ln.wf01.wt01 where time = 625 "
+          + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        System.out.println(cnt + " " + ans);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      statement.close();
+      Assert.assertEquals(retArray1.length, cnt);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void LinearFillRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    LinearFillTest();
+  }
+
+  @Test
+  public void PreviousFillTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "3,3.3,false,33",
+        "70,50.5,false,550",
+        "70,null,null,null"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select temperature,status, 
hardware "
+          + "from root.ln.wf01.wt01 where time = 3 "
+          + "Fill(int32[previous, 5ms], double[previous, 5ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware "
+          + "from root.ln.wf01.wt01 where time = 70 "
+          + "Fill(int32[previous, 500ms], double[previous, 500ms], 
boolean[previous, 500ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware "
+          + "from root.ln.wf01.wt01 where time = 70 "
+          + "Fill(int32[previous, 15ms], double[previous, 15ms], 
boolean[previous, 5ms])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+      Assert.assertEquals(retArray1.length, cnt);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void PreviousFillRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    PreviousFillTest();
+  }
+
+  @Test
+  public void EmptyTimeRangeFillTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "3,3.3,false,33",
+        "70,70.34,false,374"
+    };
+    Connection connection = null;
+    try {
+      connection = DriverManager.
+          getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+      Statement statement = connection.createStatement();
+      boolean hasResultSet = statement.execute("select temperature,status, 
hardware "
+          + "from root.ln.wf01.wt01 where time = 3 "
+          + "Fill(int32[linear], double[linear], boolean[previous])");
+
+      Assert.assertTrue(hasResultSet);
+      ResultSet resultSet = statement.getResultSet();
+      int cnt = 0;
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+      }
+      statement.close();
+
+      statement = connection.createStatement();
+      hasResultSet = statement.execute("select temperature,status, hardware "
+          + "from root.ln.wf01.wt01 where time = 70 "
+          + "Fill(int32[linear], double[linear], boolean[previous])");
+
+      Assert.assertTrue(hasResultSet);
+      resultSet = statement.getResultSet();
+      while (resultSet.next()) {
+        String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR)
+            + "," + resultSet.getString(STATUS_STR) + "," + 
resultSet.getString(HARDWARE_STR);
+        Assert.assertEquals(retArray1[cnt], ans);
+        cnt++;
+        System.out.println(ans);
+      }
+      statement.close();
+      Assert.assertEquals(retArray1.length, cnt);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  @Test
+  public void EmptyTimeRangeFillRemoteTest() throws SQLException {
+    QPExecutorUtils.setLocalNodeAddr("0.0.0.0", 0);
+    EmptyTimeRangeFillTest();
+  }
+
+  private void prepareData() throws SQLException {
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+            "root")) {
+      insertData(connection, createSQLs, insertSQLs);
+    }
+  }
+}
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java
similarity index 99%
rename from 
cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryTest.java
rename to 
cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java
index f5cc295..90d4474 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryIT.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.query;
+package org.apache.iotdb.cluster.integration;
 
 import static org.apache.iotdb.cluster.utils.Utils.insertBatchData;
 import static org.apache.iotdb.cluster.utils.Utils.insertData;
@@ -43,7 +43,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class ClusterQueryTest {
+public class IoTDBQueryIT {
 
   private Server server;
   private static final ClusterConfig CLUSTER_CONFIG = 
ClusterDescriptor.getInstance().getConfig();
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryLargeDataIT.java
similarity index 99%
rename from 
cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
rename to 
cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryLargeDataIT.java
index 223f0dc..926d8a7 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryLargeDataTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBQueryLargeDataIT.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.query;
+package org.apache.iotdb.cluster.integration;
 
 import static org.apache.iotdb.cluster.utils.Utils.insertData;
 import static org.junit.Assert.assertEquals;
@@ -41,7 +41,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class ClusterQueryLargeDataTest {
+public class IoTDBQueryLargeDataIT {
 
 
   private Server server;
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
index c09aaa5..b822831 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/query/manager/ClusterLocalManagerTest.java
@@ -39,10 +39,10 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import 
org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalQueryManager;
 import 
org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalSingleQueryManager;
-import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderByTimestamp;
-import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterBatchReaderWithoutTimeGenerator;
+import 
org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterSelectSeriesBatchReader;
+import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReader;
+import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterSelectSeriesBatchReaderByTimestamp;
 import 
org.apache.iotdb.cluster.query.reader.querynode.ClusterFilterSeriesBatchReader;
-import 
org.apache.iotdb.cluster.query.reader.querynode.AbstractClusterBatchReader;
 import org.apache.iotdb.cluster.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -220,17 +220,17 @@ public class ClusterLocalManagerTest {
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(0, singleQueryManager.getQueryRound());
         assertNull(singleQueryManager.getFilterReader());
-        Map<String, AbstractClusterBatchReader> selectSeriesReaders = 
singleQueryManager
+        Map<String, AbstractClusterSelectSeriesBatchReader> 
selectSeriesReaders = singleQueryManager
             .getSelectSeriesReaders();
         assertEquals(3, selectSeriesReaders.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterBatchReader> entry : 
selectSeriesReaders.entrySet()) {
+        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : 
selectSeriesReaders.entrySet()) {
           String path = entry.getKey();
           TSDataType dataType = typeMap.get(path);
-          AbstractClusterBatchReader clusterBatchReader = entry.getValue();
-          assertNotNull(((ClusterBatchReaderWithoutTimeGenerator) 
clusterBatchReader).getReader());
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = 
entry.getValue();
+          assertNotNull(((ClusterSelectSeriesBatchReader) 
clusterBatchReader).getReader());
           assertEquals(dataType,
-              ((ClusterBatchReaderWithoutTimeGenerator) 
clusterBatchReader).getDataType());
+              ((ClusterSelectSeriesBatchReader) 
clusterBatchReader).getDataType());
         }
       }
 
@@ -247,17 +247,17 @@ public class ClusterLocalManagerTest {
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(0, singleQueryManager.getQueryRound());
         assertNull(singleQueryManager.getFilterReader());
-        Map<String, AbstractClusterBatchReader> selectSeriesReaders = 
singleQueryManager
+        Map<String, AbstractClusterSelectSeriesBatchReader> 
selectSeriesReaders = singleQueryManager
             .getSelectSeriesReaders();
         assertEquals(3, selectSeriesReaders.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterBatchReader> entry : 
selectSeriesReaders.entrySet()) {
+        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : 
selectSeriesReaders.entrySet()) {
           String path = entry.getKey();
           TSDataType dataType = typeMap.get(path);
-          AbstractClusterBatchReader clusterBatchReader = entry.getValue();
-          assertNotNull(((ClusterBatchReaderWithoutTimeGenerator) 
clusterBatchReader).getReader());
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = 
entry.getValue();
+          assertNotNull(((ClusterSelectSeriesBatchReader) 
clusterBatchReader).getReader());
           assertEquals(dataType,
-              ((ClusterBatchReaderWithoutTimeGenerator) 
clusterBatchReader).getDataType());
+              ((ClusterSelectSeriesBatchReader) 
clusterBatchReader).getDataType());
         }
       }
 
@@ -274,17 +274,17 @@ public class ClusterLocalManagerTest {
         assertEquals((long) map.get(taskId), singleQueryManager.getJobId());
         assertEquals(0, singleQueryManager.getQueryRound());
         assertNull(singleQueryManager.getFilterReader());
-        Map<String, AbstractClusterBatchReader> selectSeriesReaders = 
singleQueryManager
+        Map<String, AbstractClusterSelectSeriesBatchReader> 
selectSeriesReaders = singleQueryManager
             .getSelectSeriesReaders();
         assertEquals(3, selectSeriesReaders.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterBatchReader> entry : 
selectSeriesReaders.entrySet()) {
+        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : 
selectSeriesReaders.entrySet()) {
           String path = entry.getKey();
           TSDataType dataType = typeMap.get(path);
-          AbstractClusterBatchReader clusterBatchReader = entry.getValue();
-          assertNotNull(((ClusterBatchReaderWithoutTimeGenerator) 
clusterBatchReader).getReader());
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = 
entry.getValue();
+          assertNotNull(((ClusterSelectSeriesBatchReader) 
clusterBatchReader).getReader());
           assertEquals(dataType,
-              ((ClusterBatchReaderWithoutTimeGenerator) 
clusterBatchReader).getDataType());
+              ((ClusterSelectSeriesBatchReader) 
clusterBatchReader).getDataType());
         }
       }
       statement.close();
@@ -317,18 +317,18 @@ public class ClusterLocalManagerTest {
         
assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
         assertNotNull(filterReader.getQueryDataSet());
 
-        Map<String, AbstractClusterBatchReader> selectSeriesReaders = 
singleQueryManager
+        Map<String, AbstractClusterSelectSeriesBatchReader> 
selectSeriesReaders = singleQueryManager
             .getSelectSeriesReaders();
         assertNotNull(selectSeriesReaders);
         assertEquals(3, selectSeriesReaders.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterBatchReader> entry : 
selectSeriesReaders.entrySet()) {
+        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : 
selectSeriesReaders.entrySet()) {
           String path = entry.getKey();
           TSDataType dataType = typeMap.get(path);
-          AbstractClusterBatchReader clusterBatchReader = entry.getValue();
-          assertNotNull(((ClusterBatchReaderByTimestamp) 
clusterBatchReader).getReaderByTimeStamp());
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = 
entry.getValue();
+          assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) 
clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
-              ((ClusterBatchReaderByTimestamp) 
clusterBatchReader).getDataType());
+              ((ClusterSelectSeriesBatchReaderByTimestamp) 
clusterBatchReader).getDataType());
         }
       }
 
@@ -351,18 +351,18 @@ public class ClusterLocalManagerTest {
         
assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
         assertNotNull(filterReader.getQueryDataSet());
 
-        Map<String, AbstractClusterBatchReader> selectSeriesReaders = 
singleQueryManager
+        Map<String, AbstractClusterSelectSeriesBatchReader> 
selectSeriesReaders = singleQueryManager
             .getSelectSeriesReaders();
         assertNotNull(selectSeriesReaders);
         assertEquals(3, selectSeriesReaders.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterBatchReader> entry : 
selectSeriesReaders.entrySet()) {
+        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : 
selectSeriesReaders.entrySet()) {
           String path = entry.getKey();
           TSDataType dataType = typeMap.get(path);
-          AbstractClusterBatchReader clusterBatchReader = entry.getValue();
-          assertNotNull(((ClusterBatchReaderByTimestamp) 
clusterBatchReader).getReaderByTimeStamp());
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = 
entry.getValue();
+          assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) 
clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
-              ((ClusterBatchReaderByTimestamp) 
clusterBatchReader).getDataType());
+              ((ClusterSelectSeriesBatchReaderByTimestamp) 
clusterBatchReader).getDataType());
         }
       }
 
@@ -385,18 +385,18 @@ public class ClusterLocalManagerTest {
         
assertTrue(allFilterPaths.containsAll(filterReader.getAllFilterPath()));
         assertNotNull(filterReader.getQueryDataSet());
 
-        Map<String, AbstractClusterBatchReader> selectSeriesReaders = 
singleQueryManager
+        Map<String, AbstractClusterSelectSeriesBatchReader> 
selectSeriesReaders = singleQueryManager
             .getSelectSeriesReaders();
         assertNotNull(selectSeriesReaders);
         assertEquals(3, selectSeriesReaders.size());
         Map<String, TSDataType> typeMap = singleQueryManager.getDataTypeMap();
-        for (Entry<String, AbstractClusterBatchReader> entry : 
selectSeriesReaders.entrySet()) {
+        for (Entry<String, AbstractClusterSelectSeriesBatchReader> entry : 
selectSeriesReaders.entrySet()) {
           String path = entry.getKey();
           TSDataType dataType = typeMap.get(path);
-          AbstractClusterBatchReader clusterBatchReader = entry.getValue();
-          assertNotNull(((ClusterBatchReaderByTimestamp) 
clusterBatchReader).getReaderByTimeStamp());
+          AbstractClusterSelectSeriesBatchReader clusterBatchReader = 
entry.getValue();
+          assertNotNull(((ClusterSelectSeriesBatchReaderByTimestamp) 
clusterBatchReader).getReaderByTimeStamp());
           assertEquals(dataType,
-              ((ClusterBatchReaderByTimestamp) 
clusterBatchReader).getDataType());
+              ((ClusterSelectSeriesBatchReaderByTimestamp) 
clusterBatchReader).getDataType());
         }
       }
       statement.close();
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
index 83c5fa9..904bc2d 100644
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/FillEngineExecutor.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.query.executor;
 
 import java.io.IOException;
@@ -37,7 +36,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
-public class FillEngineExecutor {
+public class FillEngineExecutor implements IFillEngineExecutor{
 
   private long jobId;
   private List<Path> selectedSeries;
@@ -52,11 +51,7 @@ public class FillEngineExecutor {
     this.typeIFillMap = typeIFillMap;
   }
 
-  /**
-   * execute fill.
-   *
-   * @param context query context
-   */
+  @Override
   public QueryDataSet execute(QueryContext context)
       throws FileNodeManagerException, PathErrorException, IOException {
     QueryResourceManager.getInstance().beginQueryOfGivenQueryPaths(jobId, 
selectedSeries);
@@ -68,7 +63,7 @@ public class FillEngineExecutor {
           .getQueryDataSource(path, context);
       TSDataType dataType = 
MManager.getInstance().getSeriesType(path.getFullPath());
       dataTypeList.add(dataType);
-      IFill fill = null;
+      IFill fill;
       if (!typeIFillMap.containsKey(dataType)) {
         fill = new PreviousFill(dataType, queryTime, 0);
       } else {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IFillEngineExecutor.java
similarity index 60%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
rename to 
iotdb/src/main/java/org/apache/iotdb/db/query/executor/IFillEngineExecutor.java
index b0a86bd..9e207ae 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/querynode/AbstractClusterBatchReader.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/IFillEngineExecutor.java
@@ -16,24 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.cluster.query.reader.querynode;
+package org.apache.iotdb.db.query.executor;
 
 import java.io.IOException;
-import java.util.List;
-import org.apache.iotdb.db.query.reader.IBatchReader;
-import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
-/**
- * Cluster batch reader, which provides another method to get batch data by 
batch timestamp.
- */
-public abstract class AbstractClusterBatchReader implements IBatchReader {
+public interface IFillEngineExecutor {
 
   /**
-   * Get batch data by batch time
+   * execute fill.
    *
-   * @param batchTime valid batch timestamp
-   * @return corresponding batch data
+   * @param context query context
    */
-  public abstract BatchData nextBatch(List<Long> batchTime) throws IOException;
-
+  QueryDataSet execute(QueryContext context)
+      throws FileNodeManagerException, PathErrorException, IOException;
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
index d64b49a..9f9a050 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.db.query.fill;
 
 import java.io.IOException;
+import java.io.Serializable;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
@@ -33,12 +33,13 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-public abstract class IFill {
+public abstract class IFill implements Serializable {
 
+  private static final long serialVersionUID = -357739398193527464L;
   long queryTime;
   TSDataType dataType;
 
-  IPointReader allDataReader;
+  transient IPointReader allDataReader;
 
   public IFill(TSDataType dataType, long queryTime) {
     this.dataType = dataType;
@@ -106,8 +107,11 @@ public abstract class IFill {
 
     @Override
     public TimeValuePair next() {
-      isUsed = true;
-      return pair;
+      if (!isUsed) {
+        isUsed = true;
+        return pair;
+      }
+      return null;
     }
 
     @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
index dc46082..399e2b8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.query.fill;
 
 import java.io.IOException;
+import java.io.Serializable;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.UnSupportedFillTypeException;
 import org.apache.iotdb.db.query.context.QueryContext;
@@ -29,8 +30,9 @@ import org.apache.iotdb.db.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 
-public class LinearFill extends IFill {
+public class LinearFill extends IFill implements Serializable {
 
+  private static final long serialVersionUID = -1774599523110930574L;
   private long beforeRange;
   private long afterRange;
 
diff --git 
a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java 
b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
index b75fb4f..93c50a8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.query.fill;
 
 import java.io.IOException;
+import java.io.Serializable;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -26,8 +27,9 @@ import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 
-public class PreviousFill extends IFill {
+public class PreviousFill extends IFill implements Serializable {
 
+  private static final long serialVersionUID = -7946089166912781464L;
   private long beforeRange;
 
   public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) {
@@ -57,7 +59,7 @@ public class PreviousFill extends IFill {
   @Override
   public IPointReader getFillResult() throws IOException {
     TimeValuePair beforePair = null;
-    TimeValuePair cachedPair = null;
+    TimeValuePair cachedPair;
     while (allDataReader.hasNext()) {
       cachedPair = allDataReader.next();
       if (cachedPair.getTimestamp() <= queryTime) {

Reply via email to