This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ce272a  [IOTDB-632]Performance improve for Linear Fill (#1136)
5ce272a is described below

commit 5ce272a4451427c6c5c040118d6924d103e27845
Author: wshao08 <[email protected]>
AuthorDate: Thu May 7 17:37:06 2020 +0800

    [IOTDB-632]Performance improve for Linear Fill (#1136)
    
    * Re-implement linear fill
---
 docs/SystemDesign/5-DataQuery/9-FillFunction.md    |  12 ++-
 .../2-DML Data Manipulation Language.md            |   3 +
 docs/zh/SystemDesign/5-DataQuery/9-FillFunction.md |  13 ++-
 .../2-DML Data Manipulation Language.md            |   3 +
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |   2 +-
 .../iotdb/db/qp/physical/crud/FillQueryPlan.java   |   2 +-
 .../iotdb/db/qp/physical/crud/GroupByFillPlan.java |   2 +-
 .../iotdb/db/qp/strategy/LogicalGenerator.java     |   6 +-
 .../query/dataset/groupby/GroupByFillDataSet.java  |   4 +-
 .../iotdb/db/query/executor/FillQueryExecutor.java |   4 +-
 .../iotdb/db/query/executor/QueryRouter.java       |   2 +-
 .../iotdb/db/query/{ => executor}/fill/IFill.java  |  16 +--
 .../fill/LastPointReader.java}                     |  89 ++++------------
 .../db/query/{ => executor}/fill/LinearFill.java   |  93 +++++++++++------
 .../iotdb/db/query/executor/fill/PreviousFill.java | 115 +++++++++++++++++++++
 .../apache/iotdb/db/integration/IoTDBFillIT.java   |  52 +++++++++-
 .../apache/iotdb/db/qp/plan/PhysicalPlanTest.java  |   4 +-
 17 files changed, 288 insertions(+), 134 deletions(-)

diff --git a/docs/SystemDesign/5-DataQuery/9-FillFunction.md 
b/docs/SystemDesign/5-DataQuery/9-FillFunction.md
index 09769b6..b54d299 100644
--- a/docs/SystemDesign/5-DataQuery/9-FillFunction.md
+++ b/docs/SystemDesign/5-DataQuery/9-FillFunction.md
@@ -25,7 +25,7 @@ The main logic of Fill function is in FillQueryExecutor
 
 * org.apache.iotdb.db.query.executor.FillQueryExecutor
 
-Two fill functions are support in IoTDB, Previous Fill and Linear Fill. The 
logic of Linear Fill is very straightforward. We mainly highlight on 
introducing Previous Fill in this chapter.
+Two fill functions are support in IoTDB, Previous Fill and Linear Fill. 
 
 # Previous Fill
 
@@ -147,3 +147,13 @@ public TimeValuePair getFillResult() throws IOException {
     return lastPointResult;
 }
 ```
+
+# Linear Fill
+
+The result of Linear Fill functions at timestamp "T" is calculated by 
performing a linear fitting method on two timeseries values, one is at the 
closest timestamp before T, and the other is at the closest timestamp after T. 
Linear Fill function calculation only supports numeric types including int, 
double and float.
+
+## Calculating before timestamp value
+Before timestamp value is calculated in the same way with Previous Fill.
+
+## Calculating after timestamp value
+For after timestamp value, the time-value pair is generated by two aggregation 
querys "MIN_TIME" and "FIRST_VALUE". `AggregationExecutor.aggregateOneSeries()` 
is used to calculate these two aggregation results and compose a time-value 
pair.
diff --git a/docs/UserGuide/5-Operation Manual/2-DML Data Manipulation 
Language.md b/docs/UserGuide/5-Operation Manual/2-DML Data Manipulation 
Language.md
index cab81ad..5e255fa 100644
--- a/docs/UserGuide/5-Operation Manual/2-DML Data Manipulation Language.md     
+++ b/docs/UserGuide/5-Operation Manual/2-DML Data Manipulation Language.md     
@@ -401,6 +401,9 @@ Detailed descriptions of all parameters are given in Table 
3-5.
 |before\_range, after\_range|represents the valid time range of the linear 
method. The previous method works when there are values in the 
[T-before\_range, T+after\_range] range. When before\_range and after\_range 
are not explicitly specified, default\_fill\_interval is used. -1 represents 
infinity; optional field|
 </center>
 
+**Note** if the timeseries has a valid value at query timestamp T, this value 
will be used as the linear fill value.
+Otherwise, if there is no valid fill value in either range [T-before_range,T] 
or [T, T + after_range], linear fill method will return null.
+
 Here we give an example of filling null values using the linear method. The 
SQL statement is as follows:
 
 ```
diff --git a/docs/zh/SystemDesign/5-DataQuery/9-FillFunction.md 
b/docs/zh/SystemDesign/5-DataQuery/9-FillFunction.md
index dd8292b..0430276 100644
--- a/docs/zh/SystemDesign/5-DataQuery/9-FillFunction.md
+++ b/docs/zh/SystemDesign/5-DataQuery/9-FillFunction.md
@@ -25,7 +25,7 @@
 
 * org.apache.iotdb.db.query.executor.FillQueryExecutor
 
-IoTDB 中支持两种填充方式,Previous填充和Linear填充。Linear填充的实现逻辑较为简单,在此章中重点介绍Previous填充的实现逻辑。
+IoTDB 中支持两种填充方式,Previous填充和Linear填充。
 
 ## Previous填充
 
@@ -146,3 +146,14 @@ public TimeValuePair getFillResult() throws IOException {
     return lastPointResult;
 }
 ```
+
+# Linear 填充
+
+对于T时间的 Linear Fill 线性填充值是由该时间序列的两个相关值做线性拟合得到的:T之前的最近时间戳对应的值,T之后的最早时间戳对应的值。
+基于这种特点,线性填充只能被应用于数字类型如:int, double, float。
+
+## 计算前时间值
+前时间值使用与 Previous Fill 中相同方式计算.
+
+## 计算后时间值
+后时间值使用聚合运算中的"MIN_TIME"和"FIRST_VALUE",分别计算出T时间之后最近的时间戳和对应的值,并组成time-value对返回。
\ No newline at end of file
diff --git a/docs/zh/UserGuide/5-Operation Manual/2-DML Data Manipulation 
Language.md b/docs/zh/UserGuide/5-Operation Manual/2-DML Data Manipulation 
Language.md
index f1f05b4..5236823 100644
--- a/docs/zh/UserGuide/5-Operation Manual/2-DML Data Manipulation Language.md  
+++ b/docs/zh/UserGuide/5-Operation Manual/2-DML Data Manipulation Language.md  
@@ -433,6 +433,9 @@ select <path> from <prefixPath> where time = <T> 
fill(<data_type>[linear, <befor
 
 </center>
 
+需要注意的是一旦时间序列在查询时间戳T时刻存在有效值,线性填充就回使用这个值作为结果返回。
+除此之外,如果在[T-before_range,T]或[T, T + 
after_range]两个范围中任意一个范围内不存在有效填充值,则线性填充返回null值。
+
 在这里,我们举一个使用线性方法填充空值的示例。 SQL语句如下:
 
 ```
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index a2c18cf..a621c6f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.qp.logical.crud;
 
 import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.IFill;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.Map;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
index d1a2045..bd6c9c8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.qp.physical.crud;
 
 import java.util.Map;
 import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.IFill;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class FillQueryPlan extends RawDataQueryPlan {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByFillPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByFillPlan.java
index 196f56a..1d620c4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByFillPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByFillPlan.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.qp.physical.crud;
 
 import org.apache.iotdb.db.qp.logical.Operator;
-import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.IFill;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.Map;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 8647ab2..e3a7cee 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -30,9 +30,9 @@ import org.apache.iotdb.db.qp.logical.sys.*;
 import org.apache.iotdb.db.qp.logical.sys.AlterTimeSeriesOperator.AlterType;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
 import org.apache.iotdb.db.qp.strategy.SqlBaseParser.*;
-import org.apache.iotdb.db.query.fill.IFill;
-import org.apache.iotdb.db.query.fill.LinearFill;
-import org.apache.iotdb.db.query.fill.PreviousFill;
+import org.apache.iotdb.db.query.executor.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.LinearFill;
+import org.apache.iotdb.db.query.executor.fill.PreviousFill;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index ef5bb56..ee6f867 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -23,8 +23,8 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.GroupByFillPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.executor.LastQueryExecutor;
-import org.apache.iotdb.db.query.fill.IFill;
-import org.apache.iotdb.db.query.fill.PreviousFill;
+import org.apache.iotdb.db.query.executor.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.PreviousFill;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.Field;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index ae849cf..34ea654 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -25,8 +25,8 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.SingleDataSet;
-import org.apache.iotdb.db.query.fill.IFill;
-import org.apache.iotdb.db.query.fill.PreviousFill;
+import org.apache.iotdb.db.query.executor.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.PreviousFill;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.Path;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java 
b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index c8ca580..eb39d00 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -27,7 +27,7 @@ import 
org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet;
 import org.apache.iotdb.db.query.dataset.groupby.GroupByWithValueFilterDataSet;
 import 
org.apache.iotdb.db.query.dataset.groupby.GroupByWithoutValueFilterDataSet;
-import org.apache.iotdb.db.query.fill.IFill;
+import org.apache.iotdb.db.query.executor.fill.IFill;
 import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java 
b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
similarity index 79%
rename from server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
rename to 
server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
index 675fed8..47bd394 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/IFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java
@@ -17,16 +17,14 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.fill;
+package org.apache.iotdb.db.query.executor.fill;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
 import java.util.Set;
@@ -47,14 +45,10 @@ public abstract class IFill {
   public abstract IFill copy();
 
   public abstract void configureFill(Path path, TSDataType dataType, long 
queryTime,
-      Set<String> sensors, QueryContext context)
-      throws StorageEngineException, QueryProcessException;
+      Set<String> sensors, QueryContext context);
 
-  public Filter getFilter() {
-    return constructFilter();
-  }
-
-  public abstract TimeValuePair getFillResult() throws IOException, 
UnSupportedFillTypeException;
+  public abstract TimeValuePair getFillResult()
+      throws IOException, QueryProcessException, StorageEngineException;
 
   public TSDataType getDataType() {
     return this.dataType;
@@ -68,5 +62,5 @@ public abstract class IFill {
     this.queryTime = queryTime;
   }
 
-  abstract Filter constructFilter();
+  abstract void constructFilter();
 }
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java 
b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
similarity index 78%
rename from 
server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
rename to 
server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
index be88b15..2e59c31 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/PreviousFill.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
@@ -16,14 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.query.fill;
+package org.apache.iotdb.db.query.executor.fill;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -32,87 +35,39 @@ import 
org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.read.reader.IPageReader;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-import java.io.IOException;
-import java.util.*;
-
-public class PreviousFill extends IFill {
+public class LastPointReader {
 
   private Path seriesPath;
+  long queryTime;
+  TSDataType dataType;
   private QueryContext context;
-  private long beforeRange;
   private Set<String> allSensors;
   private Filter timeFilter;
 
   private QueryDataSource dataSource;
 
-  private List<TimeseriesMetadata> unseqTimeseriesMetadataList;
-
-  private boolean untilLast;
-
-  public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) {
-    this(dataType, queryTime, beforeRange, false);
-  }
-
-  public PreviousFill(long beforeRange) {
-    this(beforeRange, false);
-  }
-
-
-  public PreviousFill(long beforeRange, boolean untilLast) {
-    this.beforeRange = beforeRange;
-    this.untilLast = untilLast;
-  }
+  private List<TimeseriesMetadata> unseqTimeseriesMetadataList = new 
ArrayList<>();;
 
+  public LastPointReader() {
 
-  public PreviousFill(TSDataType dataType, long queryTime, long beforeRange, 
boolean untilLast) {
-    super(dataType, queryTime);
-    this.beforeRange = beforeRange;
-    this.unseqTimeseriesMetadataList = new ArrayList<>();
-    this.untilLast = untilLast;
   }
 
-
-
-  @Override
-  public IFill copy() {
-    return new PreviousFill(dataType,  queryTime, beforeRange, untilLast);
-  }
-
-  @Override
-  Filter constructFilter() {
-    Filter lowerBound = beforeRange == -1 ? TimeFilter.gtEq(Long.MIN_VALUE)
-        : TimeFilter.gtEq(queryTime - beforeRange);
-    // time in [queryTime - beforeRange, queryTime]
-    return FilterFactory.and(lowerBound, TimeFilter.ltEq(queryTime));
-  }
-
-  public long getBeforeRange() {
-    return beforeRange;
-  }
-
-  @Override
-  public void configureFill(Path path, TSDataType dataType, long queryTime,
-      Set<String> sensors, QueryContext context)
-      throws StorageEngineException, QueryProcessException {
-    this.seriesPath = path;
+  public LastPointReader(Path seriesPath, TSDataType dataType, Set<String> 
sensors,
+      QueryContext context, QueryDataSource dataSource, long queryTime, Filter 
timeFilter) {
+    this.seriesPath = seriesPath;
     this.dataType = dataType;
+    this.dataSource = dataSource;
     this.context = context;
     this.queryTime = queryTime;
     this.allSensors = sensors;
-    this.timeFilter = constructFilter();
-    this.dataSource = 
QueryResourceManager.getInstance().getQueryDataSource(path, context, 
timeFilter);
-    // update filter by TTL
-    timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
+    this.timeFilter = timeFilter;
   }
 
-  @Override
-  public TimeValuePair getFillResult() throws IOException {
+  public TimeValuePair readLastPoint() throws IOException {
     TimeValuePair lastPointResult = retrieveValidLastPointFromSeqFiles();
     UnpackOverlappedUnseqFiles(lastPointResult.getTimestamp());
 
@@ -266,12 +221,4 @@ public class PreviousFill extends IFill {
   private TimeValuePair constructLastPair(long timestamp, Object value, 
TSDataType dataType) {
     return new TimeValuePair(timestamp, TsPrimitiveType.getByType(dataType, 
value));
   }
-
-  public boolean isUntilLast() {
-    return untilLast;
-  }
-
-  public void setUntilLast(boolean untilLast) {
-    this.untilLast = untilLast;
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java 
b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java
similarity index 65%
rename from server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
rename to 
server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java
index dd74809..a4aa19b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/fill/LinearFill.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java
@@ -17,33 +17,40 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.query.fill;
+package org.apache.iotdb.db.query.executor.fill;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.impl.FirstValueAggrResult;
+import org.apache.iotdb.db.query.aggregation.impl.MinTimeAggrResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
+import org.apache.iotdb.db.query.executor.AggregationExecutor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
-import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
-import org.apache.iotdb.tsfile.read.reader.IBatchReader;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
 
 public class LinearFill extends IFill {
 
+  private Path seriesPath;
   private long beforeRange;
   private long afterRange;
-  private IBatchReader dataReader;
-  private BatchData batchData;
+  private Filter beforeFilter;
+  private Filter afterFilter;
+  private QueryContext context;
+  private Set<String> allSensors;
 
   public LinearFill(long beforeRange, long afterRange) {
     this.beforeRange = beforeRange;
@@ -58,7 +65,6 @@ public class LinearFill extends IFill {
     super(dataType, queryTime);
     this.beforeRange = beforeRange;
     this.afterRange = afterRange;
-    batchData = new BatchData();
   }
 
   public long getBeforeRange() {
@@ -83,57 +89,76 @@ public class LinearFill extends IFill {
   }
 
   @Override
-  Filter constructFilter() {
+  void constructFilter() {
     Filter lowerBound = beforeRange == -1 ? TimeFilter.gtEq(Long.MIN_VALUE)
         : TimeFilter.gtEq(queryTime - beforeRange);
     Filter upperBound = afterRange == -1 ? TimeFilter.ltEq(Long.MAX_VALUE)
         : TimeFilter.ltEq(queryTime + afterRange);
     // [queryTIme - beforeRange, queryTime + afterRange]
-    return FilterFactory.and(lowerBound, upperBound);
+    beforeFilter = FilterFactory.and(lowerBound, TimeFilter.ltEq(queryTime));
+    afterFilter = FilterFactory.and(TimeFilter.gtEq(queryTime), upperBound);
   }
 
   @Override
-  public void configureFill(Path path, TSDataType dataType, long queryTime,
-      Set<String> sensors, QueryContext context)
-      throws StorageEngineException, QueryProcessException {
+  public void configureFill(
+      Path path, TSDataType dataType, long queryTime, Set<String> sensors, 
QueryContext context) {
+    this.seriesPath = path;
     this.dataType = dataType;
     this.queryTime = queryTime;
-    Filter timeFilter = constructFilter();
-    dataReader = new SeriesRawDataBatchReader(path, sensors, dataType, context,
-        QueryResourceManager.getInstance().getQueryDataSource(path, context, 
timeFilter),
-        timeFilter, null, null);
+    this.context = context;
+    this.allSensors = sensors;
+    constructFilter();
   }
 
   @Override
-  public TimeValuePair getFillResult() throws IOException, 
UnSupportedFillTypeException {
-    TimeValuePair beforePair = null;
-    TimeValuePair afterPair = null;
-    while (batchData.hasCurrent() || dataReader.hasNextBatch()) {
-      if (!batchData.hasCurrent() && dataReader.hasNextBatch()) {
-        batchData = dataReader.nextBatch();
-      }
-      afterPair = new TimeValuePair(batchData.currentTime(), 
batchData.currentTsPrimitiveType());
-      batchData.next();
-      if (afterPair.getTimestamp() <= queryTime) {
-        beforePair = afterPair;
-      } else {
-        break;
-      }
-    }
+  public TimeValuePair getFillResult()
+      throws IOException, QueryProcessException, StorageEngineException {
+    QueryDataSource dataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(seriesPath, 
context, beforeFilter);
+    LastPointReader lastReader =
+        new LastPointReader(seriesPath, dataType, allSensors, context, 
dataSource, queryTime, beforeFilter);
+
+    TimeValuePair beforePair = lastReader.readLastPoint();
+    TimeValuePair afterPair = calculateFirstPointAfterQueryTime();
 
     // no before data or has data on the query timestamp
-    if (beforePair == null || beforePair.getTimestamp() == queryTime) {
+    if (beforePair.getValue() == null || beforePair.getTimestamp() == 
queryTime) {
+      beforePair.setTimestamp(queryTime);
       return beforePair;
     }
 
     // on after data or after data is out of range
-    if (afterPair.getTimestamp() < queryTime || (afterRange != -1 && 
afterPair.getTimestamp() > queryTime + afterRange)) {
+    if (afterPair.getValue() == null || afterPair.getTimestamp() < queryTime ||
+        (afterRange != -1 && afterPair.getTimestamp() > queryTime + 
afterRange)) {
       return new TimeValuePair(queryTime, null);
     }
 
     return average(beforePair, afterPair);
   }
 
+  private TimeValuePair calculateFirstPointAfterQueryTime()
+      throws IOException, StorageEngineException, QueryProcessException {
+    TimeValuePair result = new TimeValuePair(0, null);
+
+    List<AggregateResult> aggregateResultList = new ArrayList<>();
+    AggregateResult minTimeResult = new MinTimeAggrResult();
+    AggregateResult firstValueResult = new FirstValueAggrResult(dataType);
+    aggregateResultList.add(minTimeResult);
+    aggregateResultList.add(firstValueResult);
+    AggregationExecutor.aggregateOneSeries(
+        seriesPath, allSensors, context, afterFilter, dataType, 
aggregateResultList, null);
+
+    if (minTimeResult.getResult() != null) {
+      long timestamp = (long)(minTimeResult.getResult());
+      result.setTimestamp(timestamp);
+    }
+    if (firstValueResult.getResult() != null) {
+      Object value = firstValueResult.getResult();
+      result.setValue(TsPrimitiveType.getByType(dataType, value));
+    }
+    return result;
+  }
+
   // returns the average of two points
   private TimeValuePair average(TimeValuePair beforePair, TimeValuePair 
afterPair)
       throws UnSupportedFillTypeException {
@@ -179,4 +204,4 @@ public class LinearFill extends IFill {
     beforePair.setTimestamp(queryTime);
     return beforePair;
   }
-}
\ No newline at end of file
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java
new file mode 100644
index 0000000..d43b731
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.query.executor.fill;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class PreviousFill extends IFill {
+
+  private Path seriesPath;
+  private QueryContext context;
+  private long beforeRange;
+  private Set<String> allSensors;
+  private Filter timeFilter;
+
+  private boolean untilLast;
+
+  public PreviousFill(TSDataType dataType, long queryTime, long beforeRange) {
+    this(dataType, queryTime, beforeRange, false);
+  }
+
+  public PreviousFill(long beforeRange) {
+    this(beforeRange, false);
+  }
+
+
+  public PreviousFill(long beforeRange, boolean untilLast) {
+    this.beforeRange = beforeRange;
+    this.untilLast = untilLast;
+  }
+
+
+  public PreviousFill(TSDataType dataType, long queryTime, long beforeRange, 
boolean untilLast) {
+    super(dataType, queryTime);
+    this.beforeRange = beforeRange;
+    this.untilLast = untilLast;
+  }
+
+  @Override
+  public IFill copy() {
+    return new PreviousFill(dataType,  queryTime, beforeRange, untilLast);
+  }
+
+  @Override
+  void constructFilter() {
+    Filter lowerBound = beforeRange == -1 ? TimeFilter.gtEq(Long.MIN_VALUE)
+        : TimeFilter.gtEq(queryTime - beforeRange);
+    // time in [queryTime - beforeRange, queryTime]
+    timeFilter = FilterFactory.and(lowerBound, TimeFilter.ltEq(queryTime));
+  }
+
+  public long getBeforeRange() {
+    return beforeRange;
+  }
+
+  @Override
+  public void configureFill(
+      Path path, TSDataType dataType, long queryTime, Set<String> sensors, 
QueryContext context) {
+    this.seriesPath = path;
+    this.dataType = dataType;
+    this.context = context;
+    this.queryTime = queryTime;
+    this.allSensors = sensors;
+    constructFilter();
+  }
+
+  @Override
+  public TimeValuePair getFillResult()
+      throws IOException, QueryProcessException, StorageEngineException {
+    QueryDataSource dataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(seriesPath, 
context, timeFilter);
+    // update filter by TTL
+    timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
+    LastPointReader lastReader = new LastPointReader(
+        seriesPath, dataType, allSensors, context, dataSource, queryTime, 
timeFilter);
+
+    return lastReader.readLastPoint();
+  }
+
+  public boolean isUntilLast() {
+    return untilLast;
+  }
+
+  public void setUntilLast(boolean untilLast) {
+    this.untilLast = untilLast;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
index 134d815..b721d7e 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
@@ -143,12 +143,11 @@ public class IoTDBFillIT {
   }
 
   @Test
-  public void LinearFillTest() throws SQLException {
+  public void LinearFillCommonTest() throws SQLException {
     String[] retArray1 = new String[]{
         "3,3.3,false,33",
         "70,70.34,false,374",
-        "70,null,null,null",
-        "625,null,false,null"
+        "70,70.34,false,374"
     };
     try (Connection connection = DriverManager.
         getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
@@ -186,9 +185,56 @@ public class IoTDBFillIT {
       }
 
       hasResultSet = statement.execute("select temperature,status, hardware "
+          + "from root.ln.wf01.wt01 where time = 70 Fill(int32[linear], "
+          + "double[linear], boolean[previous])");
+
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + 
resultSet.getString(HARDWARE_STR);
+          Assert.assertEquals(retArray1[cnt], ans);
+          cnt++;
+        }
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void LinearFillWithBeforeOrAfterValueNullTest() throws SQLException {
+    String[] retArray1 = new String[]{
+        "70,null,null,null",
+        "80,null,null,null",
+        "625,null,false,null"
+    };
+    try (Connection connection = DriverManager.
+        getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+
+      boolean hasResultSet = statement.execute("select temperature,status, 
hardware "
           + "from root.ln.wf01.wt01 where time = 70 "
           + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], 
boolean[previous, 5ms])");
 
+      int cnt = 0;
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans = resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(TEMPERATURE_STR_1)
+              + "," + resultSet.getString(STATUS_STR_1) + "," + 
resultSet.getString(HARDWARE_STR);
+          Assert.assertEquals(retArray1[cnt], ans);
+          cnt++;
+        }
+      }
+
+      hasResultSet = statement.execute("select temperature,status, hardware "
+          + "from root.ln.wf01.wt01 where time = 80 "
+          + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], 
boolean[previous, 5ms])");
+
       Assert.assertTrue(hasResultSet);
       try (ResultSet resultSet = statement.getResultSet()) {
         while (resultSet.next()) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java 
b/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
index 33f1a69..7fe7250 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/plan/PhysicalPlanTest.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.*;
 import org.apache.iotdb.db.qp.physical.sys.*;
-import org.apache.iotdb.db.query.fill.LinearFill;
-import org.apache.iotdb.db.query.fill.PreviousFill;
+import org.apache.iotdb.db.query.executor.fill.LinearFill;
+import org.apache.iotdb.db.query.executor.fill.PreviousFill;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;

Reply via email to