This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-3722
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e417bcc390506dd9009536a78acf3c463e1239ec
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jul 5 16:59:22 2022 +0800

    [IOTDB-3722] Extend Fill function
---
 .../operator/process/LinearFillOperator.java       |   6 +-
 .../operator/process/fill/ILinearFill.java         |  54 +++++++++++
 .../process/fill/identity/IdentityFill.java        |  30 ++++++
 .../process/fill/identity/IdentityLinearFill.java  |  42 +++++++++
 .../operator/process/fill/linear/LinearFill.java   |   6 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |  31 -------
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  20 +++-
 .../execution/operator/LinearFillOperatorTest.java | 103 +++++++++++++++++++++
 8 files changed, 252 insertions(+), 40 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 8193e20684..125cda1eae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
@@ -37,7 +37,7 @@ import static java.util.Objects.requireNonNull;
 public class LinearFillOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
-  private final LinearFill[] fillArray;
+  private final ILinearFill[] fillArray;
   private final Operator child;
   private final int outputColumnCount;
   // TODO need to spill it to disk if it consumes too much memory
@@ -52,7 +52,7 @@ public class LinearFillOperator implements ProcessOperator {
   private boolean noMoreTsBlock;
 
   public LinearFillOperator(
-      OperatorContext operatorContext, LinearFill[] fillArray, Operator child) 
{
+      OperatorContext operatorContext, ILinearFill[] fillArray, Operator 
child) {
     this.operatorContext = requireNonNull(operatorContext, "operatorContext is 
null");
     checkArgument(
         fillArray != null && fillArray.length > 0, "fillArray should not be 
null or empty");
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
new file mode 100644
index 0000000000..7bdd899ad3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public interface ILinearFill {
+
+  /**
+   * Before we call this method, we need to make sure the nextValue has been 
prepared or noMoreNext
+   * has been set to true
+   *
+   * @param timeColumn TimeColumn of valueColumn
+   * @param valueColumn valueColumn that need to be filled
+   * @return Value Column that has been filled
+   */
+  Column fill(TimeColumn timeColumn, Column valueColumn);
+
+  /**
+   * @param time end time of current valueColumn that need to be filled
+   * @param valueColumn valueColumn that need to be filled
+   * @return true if valueColumn can't be filled using current information, 
and we need to get next
+   *     TsBlock and then call prepareForNext. false if valueColumn can be 
filled using current
+   *     information, and we can directly call fill() function
+   */
+  boolean needPrepareForNext(long time, Column valueColumn);
+
+  /**
+   * @param time end time of current valueColumn that need to be filled
+   * @param nextTimeColumn TimeColumn of next TsBlock
+   * @param nextValueColumn Value Column of next TsBlock
+   * @return true if we get enough information to fill current column, and we 
can stop getting next
+   *     TsBlock and calling prepareForNext. false if we still don't get 
enough information to fill
+   *     current column, and still need to keep getting next TsBlock and then 
call prepareForNext
+   */
+  boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column 
nextValueColumn);
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
new file mode 100644
index 0000000000..25a5264bb7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.identity;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class IdentityFill implements IFill {
+
+  @Override
+  public Column fill(Column valueColumn) {
+    return valueColumn;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
new file mode 100644
index 0000000000..9b1cb93a5a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process.fill.identity;
+
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class IdentityLinearFill implements ILinearFill {
+
+  @Override
+  public Column fill(TimeColumn timeColumn, Column valueColumn) {
+    return valueColumn;
+  }
+
+  @Override
+  public boolean needPrepareForNext(long time, Column valueColumn) {
+    return false;
+  }
+
+  @Override
+  public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column 
nextValueColumn) {
+    throw new IllegalArgumentException(
+        "We won't call prepareForNext in IdentityLinearFill, because 
needPrepareForNext() method will always return false.");
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
index 92685b9c38..b813661058 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
 
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
@@ -31,7 +32,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
  * the closest timestamp after T. Linear Fill function calculation only 
supports numeric types
  * including long, int, double and float.
  */
-public abstract class LinearFill {
+public abstract class LinearFill implements ILinearFill {
 
   private final TimeComparator timeComparator;
 
@@ -56,6 +57,7 @@ public abstract class LinearFill {
    * @param valueColumn valueColumn that need to be filled
    * @return Value Column that has been filled
    */
+  @Override
   public Column fill(TimeColumn timeColumn, Column valueColumn) {
     int size = valueColumn.getPositionCount();
     // if this valueColumn is empty, just return itself;
@@ -119,6 +121,7 @@ public abstract class LinearFill {
    *     TsBlock and then call prepareForNext. false if valueColumn can be 
filled using current
    *     information, and we can directly call fill() function
    */
+  @Override
   public boolean needPrepareForNext(long time, Column valueColumn) {
     return timeComparator.inFillBound(nextTime, time)
         && valueColumn.isNull(valueColumn.getPositionCount() - 1);
@@ -132,6 +135,7 @@ public abstract class LinearFill {
    *     TsBlock and calling prepareForNext. false if we still don't get 
enough information to fill
    *     current column, and still need to keep getting next TsBlock and then 
call prepareForNext
    */
+  @Override
   public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column 
nextValueColumn) {
     checkArgument(
         nextTimeColumn.getPositionCount() > 0
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 0a4aeccfac..554b821e15 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -49,7 +49,6 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn;
@@ -412,36 +411,6 @@ public class Analyzer {
           FillComponent fillComponent = queryStatement.getFillComponent();
           List<Expression> fillColumnList =
               
outputExpressions.stream().map(Pair::getLeft).distinct().collect(Collectors.toList());
-          if (fillComponent.getFillPolicy() == FillPolicy.VALUE) {
-            for (Expression fillColumn : fillColumnList) {
-              TSDataType checkedDataType = 
typeProvider.getType(fillColumn.getExpressionString());
-              if 
(!fillComponent.getFillValue().isDataTypeConsistency(checkedDataType)) {
-                throw new SemanticException(
-                    String.format(
-                        "Data type mismatch: column '%s' (dataType '%s') 
doesn't support fill with '%s' (dataType '%s').",
-                        fillColumn.getExpressionString(),
-                        checkedDataType,
-                        fillComponent.getFillValue().getBinary(),
-                        fillComponent.getFillValue().getDataTypeString()));
-              }
-            }
-          } else if (fillComponent.getFillPolicy() == FillPolicy.LINEAR) {
-            // TODO support linear fill in align by device query
-            if (queryStatement.isAlignByDevice()) {
-              throw new SemanticException(
-                  "Linear fill is not supported in align by device query 
yet.");
-            }
-
-            for (Expression fillColumn : fillColumnList) {
-              TSDataType checkedDataType = 
typeProvider.getType(fillColumn.getExpressionString());
-              if (!checkedDataType.isNumeric()) {
-                throw new SemanticException(
-                    String.format(
-                        "Data type mismatch: column '%s' (dataType '%s') 
doesn't support linear fill.",
-                        fillColumn.getExpressionString(), checkedDataType));
-              }
-            }
-          }
           analysis.setFillDescriptor(
               new FillDescriptor(fillComponent.getFillPolicy(), 
fillComponent.getFillValue()));
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index ce8afcf154..98b6569433 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -59,16 +59,18 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.DoubleConstantFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.FloatConstantFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.IntConstantFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.LongConstantFill;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityFill;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.DoubleLinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.IntLinearFill;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LongLinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BinaryPreviousFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BooleanPreviousFill;
@@ -180,6 +182,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
 import static 
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
+import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN;
 
 /**
  * Used to plan a fragment instance. Currently, we simply change it from 
PlanNode to executable
@@ -197,6 +200,10 @@ public class LocalExecutionPlanner {
 
   private static final TimeComparator DESC_TIME_COMPARATOR = new 
DescTimeComparator();
 
+  private static final IdentityFill IDENTITY_FILL = new IdentityFill();
+
+  private static final IdentityLinearFill IDENTITY_LINEAR_FILL = new 
IdentityLinearFill();
+
   public static LocalExecutionPlanner getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -676,6 +683,10 @@ public class LocalExecutionPlanner {
         int inputColumns, List<TSDataType> inputDataTypes, Literal literal) {
       IFill[] constantFill = new IFill[inputColumns];
       for (int i = 0; i < inputColumns; i++) {
+        if (!literal.isDataTypeConsistency(inputDataTypes.get(i))) {
+          constantFill[i] = IDENTITY_FILL;
+          continue;
+        }
         switch (inputDataTypes.get(i)) {
           case BOOLEAN:
             constantFill[i] = new BooleanConstantFill(literal.getBoolean());
@@ -731,9 +742,9 @@ public class LocalExecutionPlanner {
       return previousFill;
     }
 
-    private LinearFill[] getLinearFill(
+    private ILinearFill[] getLinearFill(
         int inputColumns, List<TSDataType> inputDataTypes, boolean ascending) {
-      LinearFill[] linearFill = new LinearFill[inputColumns];
+      ILinearFill[] linearFill = new ILinearFill[inputColumns];
       TimeComparator timeComparator = ascending ? ASC_TIME_COMPARATOR : 
DESC_TIME_COMPARATOR;
       for (int i = 0; i < inputColumns; i++) {
         switch (inputDataTypes.get(i)) {
@@ -751,8 +762,7 @@ public class LocalExecutionPlanner {
             break;
           case BOOLEAN:
           case TEXT:
-            throw new UnsupportedOperationException(
-                "DataType: " + inputDataTypes.get(i) + " doesn't support 
linear fill.");
+            linearFill[i] = IDENTITY_LINEAR_FILL;
           default:
             throw new IllegalArgumentException("Unknown data type: " + 
inputDataTypes.get(i));
         }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 9852707056..23ca161fa2 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
 import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
+import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
@@ -42,6 +44,7 @@ import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class LinearFillOperatorTest {
@@ -1057,4 +1060,104 @@ public class LinearFillOperatorTest {
       instanceNotificationExecutor.shutdown();
     }
   }
+
+  @Test
+  public void batchLinearFillBooleanTest() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      fragmentInstanceContext.addOperatorContext(
+          1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+      ILinearFill[] fillArray = new ILinearFill[] {new IdentityLinearFill()};
+      LinearFillOperator fillOperator =
+          new LinearFillOperator(
+              fragmentInstanceContext.getOperatorContexts().get(0),
+              fillArray,
+              new Operator() {
+                private int index = 0;
+                private final boolean[][][] value =
+                    new boolean[][][] {
+                      {{true}}, {{true}}, {{false}}, {{false}}, {{true}}, 
{{false}}, {{true}}
+                    };
+                final boolean[][][] isNull =
+                    new boolean[][][] {
+                      {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, 
{{true}}, {{true}}
+                    };
+
+                @Override
+                public OperatorContext getOperatorContext() {
+                  return null;
+                }
+
+                @Override
+                public TsBlock next() {
+                  TsBlockBuilder builder = new 
TsBlockBuilder(ImmutableList.of(TSDataType.BOOLEAN));
+                  for (int i = 0; i < 1; i++) {
+                    builder.getTimeColumnBuilder().writeLong(i + index);
+                    for (int j = 0; j < 1; j++) {
+                      if (isNull[index][i][j]) {
+                        builder.getColumnBuilder(j).appendNull();
+                      } else {
+                        
builder.getColumnBuilder(j).writeBoolean(value[index][i][j]);
+                      }
+                    }
+                    builder.declarePosition();
+                  }
+                  index++;
+                  return builder.build();
+                }
+
+                @Override
+                public boolean hasNext() {
+                  return index < 7;
+                }
+
+                @Override
+                public boolean isFinished() {
+                  return index >= 7;
+                }
+              });
+
+      int count = 0;
+      boolean[][][] res =
+          new boolean[][][] {
+            {{true}}, {{true}}, {{false}}, {{false}}, {{true}}, {{false}}, 
{{true}}
+          };
+      boolean[][][] isNull =
+          new boolean[][][] {
+            {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, 
{{true}}
+          };
+
+      while (fillOperator.hasNext()) {
+        TsBlock block = fillOperator.next();
+        assertNotNull(block);
+        for (int i = 0; i < block.getPositionCount(); i++) {
+          long expectedTime = i + count;
+          assertEquals(expectedTime, block.getTimeByIndex(i));
+          for (int j = 0; j < 1; j++) {
+            assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+            if (!isNull[count][i][j]) {
+              assertEquals(res[count][i][j], block.getColumn(j).getBoolean(i));
+            }
+          }
+        }
+        count++;
+      }
+
+      assertTrue(fillOperator.isFinished());
+      assertEquals(res.length, count);
+
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
 }

Reply via email to