This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-3724
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 51e7250a09a2f632f122528fbfd0562a6e4393fb
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Jul 4 18:55:31 2022 +0800

    [IOTDB-3724] Fix Incorrect result when querying with linear fill & order by 
time desc
---
 .../iotdb/db/it/query/IoTDBNullValueFillIT.java    |   2 -
 .../process/fill/linear/DoubleLinearFill.java      |   5 +
 .../process/fill/linear/FloatLinearFill.java       |   5 +
 .../process/fill/linear/IntLinearFill.java         |   5 +
 .../operator/process/fill/linear/LinearFill.java   |  27 +-
 .../process/fill/linear/LongLinearFill.java        |   5 +
 .../operator/process/merge/AscTimeComparator.java  |   5 +
 .../operator/process/merge/DescTimeComparator.java |   5 +
 .../operator/process/merge/TimeComparator.java     |   5 +
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  14 +-
 .../execution/operator/LinearFillOperatorTest.java | 532 ++++++++++++++++++++-
 11 files changed, 585 insertions(+), 25 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
index 59264a60ad..46f4f9e5b7 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.itbase.category.ClusterIT;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -218,7 +217,6 @@ public class IoTDBNullValueFillIT {
   }
 
   @Test
-  @Ignore // TODO fix IOTDB-3724
   public void linearFillTest() {
     String[] expectedHeader =
         new String[] {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
index 52a228b1e4..ca8f558647 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
@@ -33,6 +34,10 @@ public class DoubleLinearFill extends LinearFill {
 
   private double nextValueInCurrentColumn;
 
+  public DoubleLinearFill(boolean ascending, TimeComparator timeComparator) {
+    super(ascending, timeComparator);
+  }
+
   @Override
   void fillValue(Column column, int index, Object array) {
     ((double[]) array)[index] = column.getDouble(index);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
index a32c60d61e..803c55ec8f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
@@ -33,6 +34,10 @@ public class FloatLinearFill extends LinearFill {
 
   private float nextValueInCurrentColumn;
 
+  public FloatLinearFill(boolean ascending, TimeComparator timeComparator) {
+    super(ascending, timeComparator);
+  }
+
   @Override
   void fillValue(Column column, int index, Object array) {
     ((float[]) array)[index] = column.getFloat(index);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
index baf4806551..4876ed23b8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
@@ -33,6 +34,10 @@ public class IntLinearFill extends LinearFill {
 
   private int nextValueInCurrentColumn;
 
+  public IntLinearFill(boolean ascending, TimeComparator timeComparator) {
+    super(ascending, timeComparator);
+  }
+
   @Override
   void fillValue(Column column, int index, Object array) {
     ((int[]) array)[index] = column.getInt(index);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
index 0e26d2baf5..92685b9c38 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -28,17 +29,25 @@ import static 
com.google.common.base.Preconditions.checkArgument;
  * The result of Linear Fill functions at timestamp "T" is calculated by 
performing a linear fitting
  * method on two time series values, one is at the closest timestamp before T, 
and the other is at
  * the closest timestamp after T. Linear Fill function calculation only 
supports numeric types
- * including int, double and float.
+ * including long, int, double and float.
  */
 public abstract class LinearFill {
 
+  private final TimeComparator timeComparator;
+
   // whether previous value is null
   protected boolean previousIsNull = true;
   // time of next value
-  protected long nextTime = Long.MIN_VALUE;
+  protected long nextTime;
 
   protected long nextTimeInCurrentColumn;
 
+  public LinearFill(boolean ascending, TimeComparator timeComparator) {
+    this.nextTime = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
+    this.nextTimeInCurrentColumn = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
+    this.timeComparator = timeComparator;
+  }
+
   /**
    * Before we call this method, we need to make sure the nextValue has been 
prepared or noMoreNext
    * has been set to true
@@ -65,7 +74,7 @@ public abstract class LinearFill {
     // if its values are all null
     if (valueColumn instanceof RunLengthEncodedColumn) {
       // previous value is null or next value is null, we just return 
NULL_VALUE_BLOCK
-      if (previousIsNull || nextTime < timeColumn.getStartTime()) {
+      if (previousIsNull || timeComparator.inFillBound(nextTime, 
timeColumn.getStartTime())) {
         return new RunLengthEncodedColumn(createNullValueColumn(), size);
       } else {
         prepareForNextValueInCurrentColumn(
@@ -111,7 +120,8 @@ public abstract class LinearFill {
    *     information, and we can directly call fill() function
    */
   public boolean needPrepareForNext(long time, Column valueColumn) {
-    return time > nextTime && 
valueColumn.isNull(valueColumn.getPositionCount() - 1);
+    return timeComparator.inFillBound(nextTime, time)
+        && valueColumn.isNull(valueColumn.getPositionCount() - 1);
   }
 
   /**
@@ -124,9 +134,10 @@ public abstract class LinearFill {
    */
   public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column 
nextValueColumn) {
     checkArgument(
-        nextTimeColumn.getPositionCount() > 0 && nextTimeColumn.getLong(0) > 
time,
+        nextTimeColumn.getPositionCount() > 0
+            && timeComparator.inFillBound(time, nextTimeColumn.getLong(0)),
         "nextColumn's time should be greater than current time");
-    if (time <= nextTime) {
+    if (timeComparator.satisfyCurEndTime(time, nextTime)) {
       return true;
     }
 
@@ -141,12 +152,12 @@ public abstract class LinearFill {
   }
 
   private boolean nextIsNull(long time) {
-    return nextTimeInCurrentColumn <= time;
+    return timeComparator.satisfyCurEndTime(nextTimeInCurrentColumn, time);
   }
 
   private void prepareForNextValueInCurrentColumn(
       long time, int startIndex, TimeColumn timeColumn, Column valueColumn) {
-    if (time <= nextTimeInCurrentColumn) {
+    if (timeComparator.satisfyCurEndTime(time, nextTimeInCurrentColumn)) {
       return;
     }
     for (int i = startIndex; i < valueColumn.getPositionCount(); i++) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
index 04dba1613a..5f6986ad2e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
@@ -33,6 +34,10 @@ public class LongLinearFill extends LinearFill {
 
   private long nextValueInCurrentColumn;
 
+  public LongLinearFill(boolean ascending, TimeComparator timeComparator) {
+    super(ascending, timeComparator);
+  }
+
   @Override
   void fillValue(Column column, int index, Object array) {
     ((long[]) array)[index] = column.getLong(index);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
index 95b7316844..4aa723ad97 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
@@ -30,4 +30,9 @@ public class AscTimeComparator implements TimeComparator {
   public long getCurrentEndTime(long time1, long time2) {
     return Math.min(time1, time2);
   }
+
+  @Override
+  public boolean inFillBound(long time, long timeBound) {
+    return time < timeBound;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
index f53c97d8fe..495cd2852a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
@@ -30,4 +30,9 @@ public class DescTimeComparator implements TimeComparator {
   public long getCurrentEndTime(long time1, long time2) {
     return Math.max(time1, time2);
   }
+
+  @Override
+  public boolean inFillBound(long time, long timeBound) {
+    return time > timeBound;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
index db017f6186..a2f2c076da 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
@@ -25,4 +25,9 @@ public interface TimeComparator {
 
   /** @return min(time1, time2) if order by time asc, max(time1, time2) if 
order by desc */
   long getCurrentEndTime(long time1, long time2);
+
+  /**
+   * @return true if time < timeBound && order by time asc, time > timeBound 
&& order by time desc
+   */
+  boolean inFillBound(long time, long timeBound);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 522b502d37..2659a6fb86 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -665,7 +665,7 @@ public class LocalExecutionPlanner {
                   context.getNextOperatorId(),
                   node.getPlanNodeId(),
                   LinearFillOperator.class.getSimpleName()),
-              getLinearFill(inputColumns, inputDataTypes),
+              getLinearFill(inputColumns, inputDataTypes, false),
               child);
         default:
           throw new IllegalArgumentException("Unknown fill policy: " + 
fillPolicy);
@@ -731,21 +731,23 @@ public class LocalExecutionPlanner {
       return previousFill;
     }
 
-    private LinearFill[] getLinearFill(int inputColumns, List<TSDataType> 
inputDataTypes) {
+    private LinearFill[] getLinearFill(
+        int inputColumns, List<TSDataType> inputDataTypes, boolean ascending) {
       LinearFill[] linearFill = new LinearFill[inputColumns];
+      TimeComparator timeComparator = ascending ? ASC_TIME_COMPARATOR : 
DESC_TIME_COMPARATOR;
       for (int i = 0; i < inputColumns; i++) {
         switch (inputDataTypes.get(i)) {
           case INT32:
-            linearFill[i] = new IntLinearFill();
+            linearFill[i] = new IntLinearFill(ascending, timeComparator);
             break;
           case INT64:
-            linearFill[i] = new LongLinearFill();
+            linearFill[i] = new LongLinearFill(ascending, timeComparator);
             break;
           case FLOAT:
-            linearFill[i] = new FloatLinearFill();
+            linearFill[i] = new FloatLinearFill(ascending, timeComparator);
             break;
           case DOUBLE:
-            linearFill[i] = new DoubleLinearFill();
+            linearFill[i] = new DoubleLinearFill(ascending, timeComparator);
             break;
           case BOOLEAN:
           case TEXT:
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 2fb9898418..d44d72ec30 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -27,6 +27,9 @@ import 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -59,12 +62,14 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
+      boolean ascending = true;
+      TimeComparator timeComparator = new AscTimeComparator();
       LinearFill[] fillArray =
           new LinearFill[] {
-            new FloatLinearFill(),
-            new FloatLinearFill(),
-            new FloatLinearFill(),
-            new FloatLinearFill()
+            new FloatLinearFill(ascending, timeComparator),
+            new FloatLinearFill(ascending, timeComparator),
+            new FloatLinearFill(ascending, timeComparator),
+            new FloatLinearFill(ascending, timeComparator)
           };
       LinearFillOperator fillOperator =
           new LinearFillOperator(
@@ -241,6 +246,206 @@ public class LinearFillOperatorTest {
     }
   }
 
+  @Test
+  public void batchLinearFillTest1OrderByDesc() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+      boolean ascending = false;
+      TimeComparator timeComparator = new DescTimeComparator();
+      LinearFill[] fillArray =
+          new LinearFill[] {
+              new FloatLinearFill(ascending, timeComparator),
+              new FloatLinearFill(ascending, timeComparator),
+              new FloatLinearFill(ascending, timeComparator),
+              new FloatLinearFill(ascending, timeComparator)
+          };
+      LinearFillOperator fillOperator =
+          new LinearFillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+                private final float[][][] value =
+                    new float[][][] {
+                        {
+                            {1.0f, 0.0f, 3.0f, 4.0f},
+                            {11.0f, 12.0f, 13.0f, 0.0f},
+                            {21.0f, 22.0f, 0.0f, 0.0f},
+                            {0.0f, 32.0f, 0.0f, 0.0f},
+                            {0.0f, 0.0f, 43.0f, 0.0f}
+                        },
+                        {
+                            {51.0f, 0.0f, 53.0f, 0.0f},
+                            {61.0f, 62.0f, 63.0f, 0.0f},
+                            {71.0f, 72.0f, 0.0f, 74.0f},
+                            {0.0f, 82.0f, 0.0f, 0.0f},
+                            {0.0f, 0.0f, 93.0f, 0.0f}
+                        },
+                        {
+                            {101.0f, 0.0f, 103.0f, 0.0f},
+                            {111.0f, 112.0f, 113.0f, 114.0f},
+                            {121.0f, 122.0f, 0.0f, 124.0f},
+                            {0.0f, 132.0f, 0.0f, 0.0f},
+                            {0.0f, 0.0f, 143.0f, 0.0f}
+                        }
+                    };
+                final boolean[][][] isNull =
+                    new boolean[][][] {
+                        {
+                            {false, true, false, false},
+                            {false, false, false, true},
+                            {false, false, true, true},
+                            {true, false, true, true},
+                            {true, true, false, true}
+                        },
+                        {
+                            {false, true, false, true},
+                            {false, false, false, true},
+                            {false, false, true, false},
+                            {true, false, true, true},
+                            {true, true, false, true}
+                        },
+                        {
+                            {false, true, false, true},
+                            {false, false, false, false},
+                            {false, false, true, false},
+                            {true, false, true, true},
+                            {true, true, false, true}
+                        }
+                    };
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  TsBlockBuilder builder =
+                      new TsBlockBuilder(
+                          ImmutableList.of(
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT));
+                  for (int i = 0; i < 5; i++) {
+                    builder.getTimeColumnBuilder().writeLong((4 - i) + (2 - 
index) * 5L);
+                    for (int j = 0; j < 4; j++) {
+                      if (isNull[index][i][j]) {
+                        builder.getColumnBuilder(j).appendNull();
+                      } else {
+                        
builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+                      }
+                    }
+                    builder.declarePosition();
+                  }
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 3;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 3;
+                }
+              });
+
+      float[][][] res =
+          new float[][][] {
+              {
+                  {1.0f, 0.0f, 3.0f, 4.0f},
+                  {11.0f, 12.0f, 13.0f, 39.0f},
+                  {21.0f, 22.0f, 28.0f, 39.0f},
+                  {36.0f, 32.0f, 28.0f, 39.0f},
+                  {36.0f, 47.0f, 43.0f, 39.0f}
+              },
+              {
+                  {51.0f, 47.0f, 53.0f, 39.0f},
+                  {61.0f, 62.0f, 63.0f, 39.0f},
+                  {71.0f, 72.0f, 78.0f, 74.0f},
+                  {86.0f, 82.0f, 78.0f, 94.0f},
+                  {86.0f, 97.0f, 93.0f, 94.0f}
+              },
+              {
+                  {101.0f, 97.0f, 103.0f, 94.0f},
+                  {111.0f, 112.0f, 113.0f, 114.0f},
+                  {121.0f, 122.0f, 128.0f, 124.0f},
+                  {0.0f, 132.0f, 128.0f, 0.0f},
+                  {0.0f, 0.0f, 143.0f, 0.0f}
+              }
+          };
+      boolean[][][] isNull =
+          new boolean[][][] {
+              {
+                  {false, true, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false}
+              },
+              {
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false}
+              },
+              {
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {true, false, false, true},
+                  {true, true, false, true}
+              }
+          };
+
+      boolean[] nullBlock = new boolean[] {true, false, false, false};
+      int nullBlockIndex = 0;
+      int count = 0;
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        assertEquals(nullBlock[nullBlockIndex++], block == null);
+        if (block == null) {
+          continue;
+        }
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = (block.getPositionCount() - i - 1) + (res.length 
- count - 1) * 5L;
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 4; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 
0.00001f);
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(res.length, count);
+      assertEquals(nullBlock.length, nullBlockIndex);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
   @Test
   public void batchLinearFillTest2() {
     ExecutorService instanceNotificationExecutor =
@@ -257,12 +462,14 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
+      boolean ascending = true;
+      TimeComparator timeComparator = new AscTimeComparator();
       LinearFill[] fillArray =
           new LinearFill[] {
-            new FloatLinearFill(),
-            new FloatLinearFill(),
-            new FloatLinearFill(),
-            new FloatLinearFill()
+            new FloatLinearFill(ascending, timeComparator),
+            new FloatLinearFill(ascending, timeComparator),
+            new FloatLinearFill(ascending, timeComparator),
+            new FloatLinearFill(ascending, timeComparator)
           };
       LinearFillOperator fillOperator =
           new LinearFillOperator(
@@ -439,6 +646,206 @@ public class LinearFillOperatorTest {
     }
   }
 
+  @Test
+  public void batchLinearFillTest2OrderByDesc() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+      boolean ascending = false;
+      TimeComparator timeComparator = new DescTimeComparator();
+      LinearFill[] fillArray =
+          new LinearFill[] {
+              new FloatLinearFill(ascending, timeComparator),
+              new FloatLinearFill(ascending, timeComparator),
+              new FloatLinearFill(ascending, timeComparator),
+              new FloatLinearFill(ascending, timeComparator)
+          };
+      LinearFillOperator fillOperator =
+          new LinearFillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+                private final float[][][] value =
+                    new float[][][] {
+                        {
+                            {1.0f, 0.0f, 3.0f, 4.0f},
+                            {11.0f, 12.0f, 13.0f, 0.0f},
+                            {21.0f, 22.0f, 0.0f, 0.0f},
+                            {0.0f, 32.0f, 0.0f, 0.0f},
+                            {0.0f, 0.0f, 0.0f, 0.0f}
+                        },
+                        {
+                            {51.0f, 0.0f, 0.0f, 0.0f},
+                            {61.0f, 62.0f, 0.0f, 0.0f},
+                            {71.0f, 72.0f, 0.0f, 74.0f},
+                            {0.0f, 82.0f, 0.0f, 0.0f},
+                            {0.0f, 0.0f, 0.0f, 0.0f}
+                        },
+                        {
+                            {101.0f, 0.0f, 103.0f, 0.0f},
+                            {111.0f, 112.0f, 0.0f, 114.0f},
+                            {121.0f, 122.0f, 0.0f, 124.0f},
+                            {0.0f, 132.0f, 0.0f, 0.0f},
+                            {0.0f, 0.0f, 0.0f, 0.0f}
+                        }
+                    };
+                final boolean[][][] isNull =
+                    new boolean[][][] {
+                        {
+                            {false, true, false, false},
+                            {false, false, false, true},
+                            {false, false, true, true},
+                            {true, false, true, true},
+                            {true, true, true, true}
+                        },
+                        {
+                            {false, true, true, true},
+                            {false, false, true, true},
+                            {false, false, true, false},
+                            {true, false, true, true},
+                            {true, true, true, true}
+                        },
+                        {
+                            {false, true, false, true},
+                            {false, false, true, false},
+                            {false, false, true, false},
+                            {true, false, true, true},
+                            {true, true, true, true}
+                        }
+                    };
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  TsBlockBuilder builder =
+                      new TsBlockBuilder(
+                          ImmutableList.of(
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT,
+                              TSDataType.FLOAT));
+                  for (int i = 0; i < 5; i++) {
+                    builder.getTimeColumnBuilder().writeLong((4 - i) + (2 - 
index) * 5L);
+                    for (int j = 0; j < 4; j++) {
+                      if (isNull[index][i][j]) {
+                        builder.getColumnBuilder(j).appendNull();
+                      } else {
+                        
builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+                      }
+                    }
+                    builder.declarePosition();
+                  }
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 3;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 3;
+                }
+              });
+
+      int count = 0;
+      float[][][] res =
+          new float[][][] {
+              {
+                  {1.0f, 0.0f, 3.0f, 4.0f},
+                  {11.0f, 12.0f, 13.0f, 39.0f},
+                  {21.0f, 22.0f, 58.0f, 39.0f},
+                  {36.0f, 32.0f, 58.0f, 39.0f},
+                  {36.0f, 47.0f, 58.0f, 39.0f}
+              },
+              {
+                  {51.0f, 47.0f, 58.0f, 39.0f},
+                  {61.0f, 62.0f, 58.0f, 39.0f},
+                  {71.0f, 72.0f, 58.0f, 74.0f},
+                  {86.0f, 82.0f, 58.0f, 94.0f},
+                  {86.0f, 97.0f, 58.0f, 94.0f}
+              },
+              {
+                  {101.0f, 97.0f, 103.0f, 94.0f},
+                  {111.0f, 112.0f, 0.0f, 114.0f},
+                  {121.0f, 122.0f, 0.0f, 124.0f},
+                  {0.0f, 132.0f, 0.0f, 0.0f},
+                  {0.0f, 0.0f, 0.0f, 0.0f}
+              }
+          };
+      boolean[][][] isNull =
+          new boolean[][][] {
+              {
+                  {false, true, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false}
+              },
+              {
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false}
+              },
+              {
+                  {false, false, false, false},
+                  {false, false, true, false},
+                  {false, false, true, false},
+                  {true, false, true, true},
+                  {true, true, true, true}
+              }
+          };
+
+      boolean[] nullBlock = new boolean[] {true, true, false, false, false};
+      int nullBlockIndex = 0;
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        assertEquals(nullBlock[nullBlockIndex++], block == null);
+        if (block == null) {
+          continue;
+        }
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = (block.getPositionCount() - i - 1) + (res.length 
- count - 1) * 5L;
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 4; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 
0.00001f);
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(res.length, count);
+      assertEquals(nullBlock.length, nullBlockIndex);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
   @Test
   public void batchLinearFillTest3() {
     ExecutorService instanceNotificationExecutor =
@@ -455,7 +862,8 @@ public class LinearFillOperatorTest {
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId1, LinearFillOperator.class.getSimpleName());
 
-      LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()};
+      LinearFill[] fillArray =
+          new LinearFill[] {new FloatLinearFill(true, new 
AscTimeComparator())};
       LinearFillOperator fillOperator =
           new LinearFillOperator(
               fragmentInstanceContext.getOperatorContexts().get(0),
@@ -543,4 +951,110 @@ public class LinearFillOperatorTest {
       instanceNotificationExecutor.shutdown();
     }
   }
+
+  @Test
+  public void batchLinearFillTest3OrderByDesc() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+      LinearFill[] fillArray =
+          new LinearFill[] {new FloatLinearFill(false, new 
DescTimeComparator())};
+      LinearFillOperator fillOperator =
+          new LinearFillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+                private final float[][][] value =
+                    new float[][][] {
+                        {{0.0f}}, {{2.0f}}, {{3.0f}}, {{4.0f}}, {{0.0f}}, 
{{0.0f}}, {{0.0f}}
+                    };
+                final boolean[][][] isNull =
+                    new boolean[][][] {
+                        {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, 
{{true}}, {{true}}
+                    };
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  TsBlockBuilder builder = new 
TsBlockBuilder(ImmutableList.of(TSDataType.FLOAT));
+                  for (int i = 0; i < 1; i++) {
+                    builder.getTimeColumnBuilder().writeLong(i + (6 - index));
+                    for (int j = 0; j < 1; j++) {
+                      if (isNull[index][i][j]) {
+                        builder.getColumnBuilder(j).appendNull();
+                      } else {
+                        
builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+                      }
+                    }
+                    builder.declarePosition();
+                  }
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 7;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 7;
+                }
+              });
+
+      int count = 0;
+      float[][][] res =
+          new float[][][] {{{0.0f}}, {{2.0f}}, {{3.0f}}, {{4.0f}}, {{0.0f}}, 
{{0.0f}}, {{0.0f}}};
+      boolean[][][] isNull =
+          new boolean[][][] {
+              {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, 
{{true}}
+          };
+
+      boolean[] nullBlock =
+          new boolean[] {true, false, false, false, false, true, true, true, 
false, false, false};
+      int nullBlockIndex = 0;
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        assertEquals(nullBlock[nullBlockIndex++], block == null);
+        if (block == null) {
+          continue;
+        }
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = (block.getPositionCount() - i - 1) + (res.length 
- count - 1);
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 1; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 
0.00001f);
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(res.length, count);
+      assertEquals(nullBlock.length, nullBlockIndex);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
 }

Reply via email to