This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/sonar by this push:
     new 206468e4fba 
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join
206468e4fba is described below

commit 206468e4fbadc1f418c4b499a382cf9e05096898
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 21 10:01:17 2023 +0800

    server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join
---
 .../process/join/HorizontallyConcatOperator.java   |  11 +-
 .../process/join/RowBasedTimeJoinOperator.java     |  66 ++--
 .../operator/process/join/TimeJoinOperator.java    | 292 ---------------
 .../process/join/merge/AscTimeComparator.java      |   3 +-
 .../operator/process/join/merge/ColumnMerger.java  |   9 +-
 .../process/join/merge/DescTimeComparator.java     |   3 +-
 .../process/join/merge/MergeSortComparator.java    |  10 +-
 .../process/join/merge/MultiColumnMerger.java      |  12 +-
 .../join/merge/NonOverlappedMultiColumnMerger.java |   5 +-
 .../process/join/merge/SingleColumnMerger.java     |  26 +-
 .../process/join/merge/SortKeyComparator.java      |   9 +-
 .../process/join/merge/TimeComparator.java         |   7 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  10 +-
 .../execution/operator/TimeJoinOperatorTest.java   | 414 ---------------------
 14 files changed, 100 insertions(+), 777 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index 2bb083b9db4..2c4c6e30828 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -42,7 +43,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
  */
 public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
 
-  /** start index for each input TsBlocks and size of it is equal to 
inputTsBlocks */
+  /** start index for each input TsBlocks and size of it is equal to 
inputTsBlocks. */
   private final int[] inputIndex;
 
   private final TsBlockBuilder tsBlockBuilder;
@@ -116,10 +117,11 @@ public class HorizontallyConcatOperator extends 
AbstractConsumeAllOperator {
     if (finished) {
       return true;
     }
-    return finished =
+    finished =
         isEmpty(readyChildIndex)
             && (children.get(readyChildIndex) == null
                 || !children.get(readyChildIndex).hasNextWithTimer());
+    return finished;
   }
 
   @Override
@@ -144,7 +146,8 @@ public class HorizontallyConcatOperator extends 
AbstractConsumeAllOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    long currentRetainedSize = 0;
+    long minChildReturnSize = Long.MAX_VALUE;
     for (Operator child : children) {
       long maxReturnSize = child.calculateMaxReturnSize();
       currentRetainedSize += (maxReturnSize + 
child.calculateRetainedSizeAfterCallingNext());
@@ -156,7 +159,7 @@ public class HorizontallyConcatOperator extends 
AbstractConsumeAllOperator {
 
   /**
    * If the tsBlock of tsBlockIndex is null or has no more data in the 
tsBlock, return true; else
-   * return false;
+   * return false.
    */
   @Override
   protected boolean isEmpty(int tsBlockIndex) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 1912d04ce4b..d5258682931 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join;
 
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -42,10 +43,10 @@ import static 
com.google.common.util.concurrent.Futures.successfulAsList;
 
 public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
 
-  /** start index for each input TsBlocks and size of it is equal to 
inputTsBlocks */
+  /** start index for each input TsBlocks and size of it is equal to 
inputTsBlocks. */
   private final int[] inputIndex;
 
-  /** used to record current index for input TsBlocks after merging */
+  /** used to record current index for input TsBlocks after merging. */
   private final int[] shadowInputIndex;
 
   /**
@@ -61,7 +62,7 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
 
   /**
    * this field indicates each data type for output columns(not including time 
column) of
-   * TimeJoinOperator its size should be equal to outputColumnCount
+   * TimeJoinOperator its size should be equal to outputColumnCount.
    */
   private final List<TSDataType> dataTypes;
 
@@ -149,8 +150,8 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
 
     if (timeSelector.isEmpty()) {
       // return empty TsBlock
-      TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(0, dataTypes);
-      return tsBlockBuilder.build();
+      TsBlockBuilder emptyTsBlockBuilder = new TsBlockBuilder(0, dataTypes);
+      return emptyTsBlockBuilder.build();
     }
 
     TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
@@ -158,31 +159,41 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
     do {
       currentTime = timeSelector.pollFirst();
       timeBuilder.writeLong(currentTime);
-      for (int i = 0; i < outputColumnCount; i++) {
-        ColumnMerger merger = mergers.get(i);
-        merger.mergeColumn(
-            inputTsBlocks,
-            inputIndex,
-            shadowInputIndex,
-            currentTime,
-            tsBlockBuilder.getColumnBuilder(i));
-      }
 
-      for (int i = 0; i < inputOperatorsCount; i++) {
-        if (inputIndex[i] != shadowInputIndex[i]) {
-          inputIndex[i] = shadowInputIndex[i];
-          if (!isEmpty(i)) {
-            updateTimeSelector(i);
-          }
-        }
-      }
+      appendOneRow(currentTime);
       tsBlockBuilder.declarePosition();
+
+      prepareForTimeHeap();
+
     } while (comparator.canContinue(currentTime, currentEndTime) && 
!timeSelector.isEmpty());
 
     resultTsBlock = tsBlockBuilder.build();
     return checkTsBlockSizeAndGetResult();
   }
 
+  private void appendOneRow(long currentTime) {
+    for (int i = 0; i < outputColumnCount; i++) {
+      ColumnMerger merger = mergers.get(i);
+      merger.mergeColumn(
+          inputTsBlocks,
+          inputIndex,
+          shadowInputIndex,
+          currentTime,
+          tsBlockBuilder.getColumnBuilder(i));
+    }
+  }
+
+  private void prepareForTimeHeap() {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (inputIndex[i] != shadowInputIndex[i]) {
+        inputIndex[i] = shadowInputIndex[i];
+        if (!isEmpty(i)) {
+          updateTimeSelector(i);
+        }
+      }
+    }
+  }
+
   @Override
   public boolean hasNext() throws Exception {
     if (finished) {
@@ -250,7 +261,8 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+    long currentRetainedSize = 0;
+    long minChildReturnSize = Long.MAX_VALUE;
     for (Operator child : children) {
       long maxReturnSize = child.calculateMaxReturnSize();
       currentRetainedSize += (maxReturnSize + 
child.calculateRetainedSizeAfterCallingNext());
@@ -274,7 +286,7 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
   protected boolean prepareInput() throws Exception {
     boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
+      if (needCallNext(i)) {
         continue;
       }
       if (canCallNext[i]) {
@@ -300,6 +312,10 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
     return allReady;
   }
 
+  private boolean needCallNext(int i) {
+    return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+  }
+
   @TestOnly
   public List<Operator> getChildren() {
     return children;
@@ -313,7 +329,7 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
 
   /**
    * If the tsBlock of columnIndex is null or has no more data in the tsBlock, 
return true; else
-   * return false;
+   * return false.
    */
   @Override
   protected boolean isEmpty(int columnIndex) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
deleted file mode 100644
index 2b60ad6d3db..00000000000
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.execution.operator.process.join;
-
-import org.apache.iotdb.db.mpp.execution.operator.AbstractOperator;
-import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.util.concurrent.Futures.successfulAsList;
-
-@Deprecated
-public class TimeJoinOperator extends AbstractOperator {
-
-  private final List<Operator> children;
-
-  private final int inputOperatorsCount;
-
-  /** TsBlock from child operator. Only one cache now. */
-  private final TsBlock[] inputTsBlocks;
-
-  /** start index for each input TsBlocks and size of it is equal to 
inputTsBlocks */
-  private final int[] inputIndex;
-
-  /** used to record current index for input TsBlocks after merging */
-  private final int[] shadowInputIndex;
-
-  /**
-   * Represent whether there are more tsBlocks from ith child operator. If all 
elements in
-   * noMoreTsBlocks[] are true and inputTsBlocks[] are consumed completely, 
this operator is
-   * finished.
-   */
-  private final boolean[] noMoreTsBlocks;
-
-  private final TimeSelector timeSelector;
-
-  private final int outputColumnCount;
-
-  /**
-   * this field indicates each data type for output columns(not including time 
column) of
-   * TimeJoinOperator its size should be equal to outputColumnCount
-   */
-  private final List<TSDataType> dataTypes;
-
-  private final List<ColumnMerger> mergers;
-
-  private final TsBlockBuilder tsBlockBuilder;
-
-  private boolean finished;
-
-  private final TimeComparator comparator;
-
-  public TimeJoinOperator(
-      OperatorContext operatorContext,
-      List<Operator> children,
-      Ordering mergeOrder,
-      List<TSDataType> dataTypes,
-      List<ColumnMerger> mergers,
-      TimeComparator comparator) {
-    checkArgument(
-        children != null && !children.isEmpty(),
-        "child size of TimeJoinOperator should be larger than 0");
-    this.operatorContext = operatorContext;
-    this.children = children;
-    this.inputOperatorsCount = children.size();
-    this.inputTsBlocks = new TsBlock[this.inputOperatorsCount];
-    this.inputIndex = new int[this.inputOperatorsCount];
-    this.shadowInputIndex = new int[this.inputOperatorsCount];
-    this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
-    this.timeSelector = new TimeSelector(this.inputOperatorsCount << 1, 
Ordering.ASC == mergeOrder);
-    this.outputColumnCount = dataTypes.size();
-    this.dataTypes = dataTypes;
-    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.mergers = mergers;
-    this.comparator = comparator;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && empty(i)) {
-        ListenableFuture<?> blocked = children.get(i).isBlocked();
-        if (!blocked.isDone()) {
-          listenableFutures.add(blocked);
-        }
-      }
-    }
-    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
-  }
-
-  @Override
-  public TsBlock next() throws Exception {
-    if (retainedTsBlock != null) {
-      return getResultFromRetainedTsBlock();
-    }
-    tsBlockBuilder.reset();
-    // end time for returned TsBlock this time, it's the min/max end time 
among all the children
-    // TsBlocks order by asc/desc
-    long currentEndTime = 0;
-    boolean init = false;
-
-    // get TsBlock for each input, put their time stamp into TimeSelector and 
then use the min Time
-    // among all the input TsBlock as the current output TsBlock's endTime.
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!noMoreTsBlocks[i] && empty(i)) {
-        if (children.get(i).hasNextWithTimer()) {
-          inputIndex[i] = 0;
-          inputTsBlocks[i] = children.get(i).nextWithTimer();
-          if (!empty(i)) {
-            int rowSize = inputTsBlocks[i].getPositionCount();
-            for (int row = 0; row < rowSize; row++) {
-              timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
-            }
-          } else {
-            // child operator has next but return an empty TsBlock which means 
that it may not
-            // finish calculation in given time slice.
-            // In such case, TimeJoinOperator can't go on calculating, so we 
just return null.
-            // We can also use the while loop here to continuously call the 
hasNext() and next()
-            // methods of the child operator until its hasNext() returns false 
or the next() gets
-            // the data that is not empty, but this will cause the execution 
time of the while
-            // loop
-            // to be uncontrollable and may exceed all allocated time slice
-            return null;
-          }
-        } else { // no more tsBlock
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
-        }
-      }
-      // update the currentEndTime if the TsBlock is not empty
-      if (!empty(i)) {
-        currentEndTime =
-            init
-                ? comparator.getCurrentEndTime(currentEndTime, 
inputTsBlocks[i].getEndTime())
-                : inputTsBlocks[i].getEndTime();
-        init = true;
-      }
-    }
-
-    if (timeSelector.isEmpty()) {
-      // return empty TsBlock
-      TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(0, dataTypes);
-      return tsBlockBuilder.build();
-    }
-
-    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    while (!timeSelector.isEmpty()
-        && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime)) 
{
-      timeBuilder.writeLong(timeSelector.pollFirst());
-      tsBlockBuilder.declarePosition();
-    }
-
-    for (int i = 0; i < outputColumnCount; i++) {
-      ColumnMerger merger = mergers.get(i);
-      merger.mergeColumn(
-          inputTsBlocks,
-          inputIndex,
-          shadowInputIndex,
-          timeBuilder,
-          currentEndTime,
-          tsBlockBuilder.getColumnBuilder(i));
-    }
-
-    // update inputIndex using shadowInputIndex
-    System.arraycopy(shadowInputIndex, 0, inputIndex, 0, inputOperatorsCount);
-
-    resultTsBlock = tsBlockBuilder.build();
-    return checkTsBlockSizeAndGetResult();
-  }
-
-  @Override
-  public boolean hasNext() throws Exception {
-    if (finished) {
-      return false;
-    }
-    if (retainedTsBlock != null) {
-      return true;
-    }
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!empty(i)) {
-        return true;
-      } else if (!noMoreTsBlocks[i]) {
-        if (children.get(i).hasNextWithTimer()) {
-          return true;
-        } else {
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
-        }
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    for (Operator child : children) {
-      child.close();
-    }
-  }
-
-  @Override
-  public boolean isFinished() throws Exception {
-    if (finished) {
-      return true;
-    }
-    if (retainedTsBlock != null) {
-      return false;
-    }
-
-    finished = true;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      // has more tsBlock output from children[i] or has cached tsBlock in 
inputTsBlocks[i]
-      if (!noMoreTsBlocks[i] || !empty(i)) {
-        finished = false;
-        break;
-      }
-    }
-    return finished;
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    long maxPeekMemory = 0;
-    long childrenMaxPeekMemory = 0;
-    for (Operator child : children) {
-      childrenMaxPeekMemory =
-          Math.max(childrenMaxPeekMemory, maxPeekMemory + 
child.calculateMaxPeekMemory());
-      maxPeekMemory +=
-          (child.calculateMaxReturnSize() + 
child.calculateRetainedSizeAfterCallingNext());
-    }
-
-    maxPeekMemory += calculateMaxReturnSize();
-    return Math.max(maxPeekMemory, childrenMaxPeekMemory);
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    // time + all value columns
-    return maxReturnSize;
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
-    for (Operator child : children) {
-      long maxReturnSize = child.calculateMaxReturnSize();
-      currentRetainedSize += (maxReturnSize + 
child.calculateRetainedSizeAfterCallingNext());
-      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
-    }
-    // max cached TsBlock
-    return currentRetainedSize - minChildReturnSize;
-  }
-
-  /**
-   * If the tsBlock of columnIndex is null or has no more data in the tsBlock, 
return true; else
-   * return false;
-   */
-  private boolean empty(int columnIndex) {
-    return inputTsBlocks[columnIndex] == null
-        || inputTsBlocks[columnIndex].getPositionCount() == 
inputIndex[columnIndex];
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
index e458b4aff38..4d24c857edb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public class AscTimeComparator implements TimeComparator {
 
-  /** @return if order by time asc, return true if time <= endTime, otherwise 
false */
+  /** return if order by time asc, return true if time <= endTime, otherwise 
false. */
   @Override
   public boolean satisfyCurEndTime(long time, long endTime) {
     return time <= endTime;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
index 7a00f2adf3b..38664d0cf5e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
@@ -16,16 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
-/** used to merge columns belonging to same series into one column */
+/** used to merge columns belonging to same series into one column. */
 public interface ColumnMerger {
 
   /**
+   * judge whether TsBlock at tsBlockIndex is empty.
+   *
    * @param tsBlockIndex index
    * @param inputTsBlocks input TsBlock array
    * @param inputIndex current index for each input TsBlock and size of it is 
equal to inputTsBlocks
@@ -39,7 +42,7 @@ public interface ColumnMerger {
 
   /**
    * merge columns belonging to same series into one column, merge until each 
input column's time is
-   * larger than currentEndTime
+   * larger than currentEndTime.
    *
    * @param inputTsBlocks all source TsBlocks, some of which will contain 
source column
    * @param inputIndex start index for each source TsBlock and size of it is 
equal to inputTsBlocks,
@@ -61,7 +64,7 @@ public interface ColumnMerger {
 
   /**
    * merge columns belonging to same series into one column, merge just one 
row whose time is equal
-   * to currentTime
+   * to currentTime.
    *
    * @param inputTsBlocks all source TsBlocks, some of which will contain 
source column
    * @param inputIndex start index for each source TsBlock and size of it is 
equal to inputTsBlocks,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
index e13661823d9..5b1a097c503 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
@@ -16,11 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public class DescTimeComparator implements TimeComparator {
 
-  /** @return if order by time desc, return true if time >= endTime, otherwise 
false */
+  /** return if order by time desc, return true if time >= endTime, otherwise 
false. */
   @Override
   public boolean satisfyCurEndTime(long time, long endTime) {
     return time >= endTime;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
index c949c2990d0..efad0370c5d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
@@ -42,7 +42,11 @@ public class MergeSortComparator {
               (SortKey sortKey) -> 
sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex))
           .reversed();
 
-  /** @param indexList -1 for time column */
+  private MergeSortComparator() {
+    // util class doesn't need constructor
+  }
+
+  /** -1 in index is for time column. */
   public static Comparator<SortKey> getComparator(
       List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType> 
dataTypeList) {
 
@@ -50,7 +54,9 @@ public class MergeSortComparator {
     List<Comparator<SortKey>> list = new ArrayList<>(indexList.size());
     for (int i = 0; i < indexList.size(); i++) {
       int index = indexList.get(i);
-      if (index == -2) continue;
+      if (index == -2) {
+        continue;
+      }
       TSDataType dataType = dataTypeList.get(i);
       boolean asc = sortItemList.get(i).getOrdering() == Ordering.ASC;
       boolean nullFirst = sortItemList.get(i).getNullOrdering() == 
NullOrdering.FIRST;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
index 65943460341..ad98076ff8d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -27,7 +28,7 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
-/** has more than one input column, but these columns' time is overlapped */
+/** has more than one input column, but these columns' time is overlapped. */
 public class MultiColumnMerger implements ColumnMerger {
 
   private final List<InputLocation> inputLocations;
@@ -36,6 +37,7 @@ public class MultiColumnMerger implements ColumnMerger {
     this.inputLocations = inputLocations;
   }
 
+  @SuppressWarnings("squid:S3776")
   @Override
   public void mergeColumn(
       TsBlock[] inputTsBlocks,
@@ -128,7 +130,6 @@ public class MultiColumnMerger implements ColumnMerger {
           // value of current location's input column is not null
           // here we only append value if there is no value appended before 
and current value is
           // null
-          // TODO That means we choose first value as the final value if there 
exist timestamp
           // belonging to more than one DataRegion, we need to choose which 
one is latest
           if (!appendValue && !valueColumn.isNull(index)) {
             columnBuilder.write(valueColumn, index);
@@ -138,13 +139,6 @@ public class MultiColumnMerger implements ColumnMerger {
           index++;
           // update the index after merging
           updatedInputIndex[tsBlockIndex] = index;
-          // we can never safely set appendValue to true and then break the 
loop, because these
-          // input
-          // columns' time may be overlapped, we should increase each column's 
index whose time is
-          // equal to currentTime
-          // if (appendValue) {
-          //    break;
-          //  }
         }
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
index c338130b8a7..da55cd78151 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -25,7 +26,7 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
-/** has more than one input column, but these columns' time is not overlapped 
*/
+/** has more than one input column, but these columns' time is not overlapped. 
*/
 public class NonOverlappedMultiColumnMerger implements ColumnMerger {
 
   private final List<InputLocation> inputLocations;
@@ -36,7 +37,7 @@ public class NonOverlappedMultiColumnMerger implements 
ColumnMerger {
   private int index;
 
   /**
-   * these columns' time should never be overlapped
+   * these columns' time should never be overlapped.
    *
    * @param inputLocations The time order in TsBlock represented by 
inputLocations should be
    *     incremented by timestamp if it is order by time asc, otherwise 
decreased by timestamp if it
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
index 0c9f7e60e71..0b12a890dc8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -25,7 +26,7 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
-/** only has one input column */
+/** only has one input column. */
 public class SingleColumnMerger implements ColumnMerger {
 
   private final InputLocation location;
@@ -57,6 +58,18 @@ public class SingleColumnMerger implements ColumnMerger {
         comparator);
   }
 
+  @Override
+  public void mergeColumn(
+      TsBlock[] inputTsBlocks,
+      int[] inputIndex,
+      int[] updatedInputIndex,
+      long currentTime,
+      ColumnBuilder columnBuilder) {
+    mergeOneColumn(
+        inputTsBlocks, inputIndex, updatedInputIndex, currentTime, 
columnBuilder, location);
+  }
+
+  @SuppressWarnings({"squid:S107", "squid:S3776"})
   public static void mergeOneColumn(
       TsBlock[] inputTsBlocks,
       int[] inputIndex,
@@ -111,17 +124,6 @@ public class SingleColumnMerger implements ColumnMerger {
     updatedInputIndex[tsBlockIndex] = index;
   }
 
-  @Override
-  public void mergeColumn(
-      TsBlock[] inputTsBlocks,
-      int[] inputIndex,
-      int[] updatedInputIndex,
-      long currentTime,
-      ColumnBuilder columnBuilder) {
-    mergeOneColumn(
-        inputTsBlocks, inputIndex, updatedInputIndex, currentTime, 
columnBuilder, location);
-  }
-
   public static void mergeOneColumn(
       TsBlock[] inputTsBlocks,
       int[] inputIndex,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
index fa4a6a1a011..a8f4384e14d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
@@ -21,10 +21,9 @@ package 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 import org.apache.iotdb.db.utils.datastructure.SortKey;
 
-import java.io.Serializable;
 import java.util.Comparator;
 
-public class SortKeyComparator implements Comparator<SortKey>, Serializable {
+public class SortKeyComparator implements Comparator<SortKey> {
 
   private final boolean nullFirst;
   private final int index;
@@ -44,7 +43,11 @@ public class SortKeyComparator implements 
Comparator<SortKey>, Serializable {
     if (!o1IsNull && !o2IsNull) {
       return originalComparator.compare(o1, o2);
     } else if (o1IsNull) {
-      return o2IsNull ? 0 : (nullFirst ? -1 : 1);
+      if (o2IsNull) {
+        return 0;
+      } else {
+        return nullFirst ? -1 : 1;
+      }
     } else {
       return nullFirst ? 1 : -1;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
index 846af9810db..d71678b7067 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
 
 public interface TimeComparator {
 
-  /** @return true if time is satisfied with endTime, otherwise false */
+  /** return true if time is satisfied with endTime, otherwise false. */
   boolean satisfyCurEndTime(long time, long endTime);
 
-  /** @return min(time1, time2) if order by time asc, max(time1, time2) if 
order by desc */
+  /** return min(time1, time2) if order by time asc, max(time1, time2) if 
order by desc. */
   long getCurrentEndTime(long time1, long time2);
 
-  /** @return time < endTime if order by time asc, time > endTime if order by 
desc */
+  /** return time < endTime if order by time asc, time > endTime if order by 
desc. */
   boolean canContinue(long time, long endTime);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 0ebf65484be..cd45d7138d2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -84,7 +84,6 @@ import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPrevi
 import 
org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.join.HorizontallyConcatOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
@@ -224,7 +223,6 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.Validate;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -1045,7 +1043,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           node.getZoneId(),
           expressionTypes,
           node.getScanOrder() == Ordering.ASC);
-    } catch (QueryProcessException | IOException e) {
+    } catch (QueryProcessException e) {
       throw new RuntimeException(e);
     }
   }
@@ -1087,7 +1085,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
       }
     }
 
-    // init UDTFContext;
+    // init UDTFContext
     UDTFContext filterContext = new UDTFContext(node.getZoneId());
     filterContext.constructUdfExecutors(new Expression[] {filterExpression});
 
@@ -1186,7 +1184,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           node.getZoneId(),
           expressionTypes,
           node.getScanOrder() == Ordering.ASC);
-    } catch (QueryProcessException | IOException e) {
+    } catch (QueryProcessException e) {
       throw new RuntimeException(e);
     }
   }
@@ -1762,7 +1760,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             .addOperatorContext(
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
-                TimeJoinOperator.class.getSimpleName());
+                RowBasedTimeJoinOperator.class.getSimpleName());
     TimeComparator timeComparator =
         node.getMergeOrder() == Ordering.ASC ? ASC_TIME_COMPARATOR : 
DESC_TIME_COMPARATOR;
     List<OutputColumn> outputColumns = generateOutputColumnsFromChildren(node);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
deleted file mode 100644
index ef122f78b87..00000000000
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.execution.operator;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
-import 
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import io.airlift.units.Duration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TimeJoinOperatorTest {
-  private static final String TIME_JOIN_OPERATOR_TEST_SG = 
"root.TimeJoinOperatorTest";
-  private final List<String> deviceIds = new ArrayList<>();
-  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
-
-  private final List<TsFileResource> seqResources = new ArrayList<>();
-  private final List<TsFileResource> unSeqResources = new ArrayList<>();
-
-  @Before
-  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
-    SeriesReaderTestUtil.setUp(
-        measurementSchemas, deviceIds, seqResources, unSeqResources, 
TIME_JOIN_OPERATOR_TEST_SG);
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
-  }
-
-  @Test
-  public void batchTest1() throws Exception {
-    ExecutorService instanceNotificationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
-    try {
-      MeasurementPath measurementPath1 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
-      Set<String> allSensors = new HashSet<>();
-      allSensors.add("sensor0");
-      allSensors.add("sensor1");
-      QueryId queryId = new QueryId("stub_query");
-      FragmentInstanceId instanceId =
-          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
-      FragmentInstanceStateMachine stateMachine =
-          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
-      FragmentInstanceContext fragmentInstanceContext =
-          createFragmentInstanceContext(instanceId, stateMachine);
-      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
-      PlanNodeId planNodeId1 = new PlanNodeId("1");
-      driverContext.addOperatorContext(1, planNodeId1, 
SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId2 = new PlanNodeId("2");
-      driverContext.addOperatorContext(2, planNodeId2, 
SeriesScanOperator.class.getSimpleName());
-      driverContext.addOperatorContext(
-          3, new PlanNodeId("3"), 
RowBasedTimeJoinOperator.class.getSimpleName());
-
-      SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
-      scanOptionsBuilder.withAllSensors(allSensors);
-      SeriesScanOperator seriesScanOperator1 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(0),
-              planNodeId1,
-              measurementPath1,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      MeasurementPath measurementPath2 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", 
TSDataType.INT32);
-      SeriesScanOperator seriesScanOperator2 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(1),
-              planNodeId2,
-              measurementPath2,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      TimeJoinOperator timeJoinOperator =
-          new TimeJoinOperator(
-              driverContext.getOperatorContexts().get(2),
-              Arrays.asList(seriesScanOperator1, seriesScanOperator2),
-              Ordering.ASC,
-              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
-              Arrays.asList(
-                  new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
-                  new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator())),
-              new AscTimeComparator());
-      int count = 0;
-      while (timeJoinOperator.hasNext()) {
-        TsBlock tsBlock = timeJoinOperator.next();
-        assertEquals(2, tsBlock.getValueColumnCount());
-        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
-        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
-        for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
-          assertEquals(count, tsBlock.getTimeByIndex(i));
-          if ((long) count < 200) {
-            assertEquals(20000 + (long) count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(20000 + (long) count, tsBlock.getColumn(1).getInt(i));
-          } else if ((long) count < 260
-              || ((long) count >= 300 && (long) count < 380)
-              || (long) count >= 400) {
-            assertEquals(10000 + (long) count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(10000 + (long) count, tsBlock.getColumn(1).getInt(i));
-          } else {
-            assertEquals(count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(count, tsBlock.getColumn(1).getInt(i));
-          }
-        }
-      }
-      assertEquals(500, count);
-    } catch (IllegalPathException e) {
-      e.printStackTrace();
-      fail();
-    } finally {
-      instanceNotificationExecutor.shutdown();
-    }
-  }
-
-  /** test time join with non-exist sensor */
-  @Test
-  public void batchTest2() throws Exception {
-    ExecutorService instanceNotificationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
-    try {
-      MeasurementPath measurementPath1 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
-      Set<String> allSensors = new HashSet<>();
-      allSensors.add("sensor0");
-      allSensors.add("sensor1");
-      allSensors.add("error_sensor");
-      QueryId queryId = new QueryId("stub_query");
-      FragmentInstanceId instanceId =
-          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
-      FragmentInstanceStateMachine stateMachine =
-          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
-      FragmentInstanceContext fragmentInstanceContext =
-          createFragmentInstanceContext(instanceId, stateMachine);
-      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
-      PlanNodeId planNodeId1 = new PlanNodeId("1");
-      driverContext.addOperatorContext(1, planNodeId1, 
SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId2 = new PlanNodeId("2");
-      driverContext.addOperatorContext(2, planNodeId2, 
SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId3 = new PlanNodeId("3");
-      driverContext.addOperatorContext(3, planNodeId3, 
SeriesScanOperator.class.getSimpleName());
-      driverContext.addOperatorContext(
-          4, new PlanNodeId("4"), TimeJoinOperator.class.getSimpleName());
-
-      SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
-      scanOptionsBuilder.withAllSensors(allSensors);
-      SeriesScanOperator seriesScanOperator1 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(0),
-              planNodeId1,
-              measurementPath1,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      MeasurementPath measurementPath2 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", 
TSDataType.INT32);
-      SeriesScanOperator seriesScanOperator2 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(1),
-              planNodeId2,
-              measurementPath2,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      MeasurementPath measurementPath3 =
-          new MeasurementPath(
-              TIME_JOIN_OPERATOR_TEST_SG + ".device0.error_sensor", 
TSDataType.INT32);
-      SeriesScanOperator seriesScanOperator3 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(2),
-              planNodeId3,
-              measurementPath3,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator3.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator3
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      TimeJoinOperator timeJoinOperator =
-          new TimeJoinOperator(
-              driverContext.getOperatorContexts().get(3),
-              Arrays.asList(seriesScanOperator1, seriesScanOperator2, 
seriesScanOperator3),
-              Ordering.ASC,
-              Arrays.asList(TSDataType.INT32, TSDataType.INT32, 
TSDataType.INT32),
-              Arrays.asList(
-                  new SingleColumnMerger(new InputLocation(0, 0), new 
AscTimeComparator()),
-                  new SingleColumnMerger(new InputLocation(1, 0), new 
AscTimeComparator()),
-                  new SingleColumnMerger(new InputLocation(2, 0), new 
AscTimeComparator())),
-              new AscTimeComparator());
-      int count = 0;
-      while (timeJoinOperator.hasNext()) {
-        TsBlock tsBlock = timeJoinOperator.next();
-        assertEquals(3, tsBlock.getValueColumnCount());
-        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
-        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
-        assertTrue(tsBlock.getColumn(2) instanceof RunLengthEncodedColumn);
-        for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
-          assertEquals(count, tsBlock.getTimeByIndex(i));
-          assertTrue(tsBlock.getColumn(2).isNull(i));
-          if ((long) count < 200) {
-            assertEquals(20000 + (long) count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(20000 + (long) count, tsBlock.getColumn(1).getInt(i));
-          } else if ((long) count < 260
-              || ((long) count >= 300 && (long) count < 380)
-              || (long) count >= 400) {
-            assertEquals(10000 + (long) count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(10000 + (long) count, tsBlock.getColumn(1).getInt(i));
-          } else {
-            assertEquals(count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(count, tsBlock.getColumn(1).getInt(i));
-          }
-        }
-      }
-      assertEquals(500, count);
-    } catch (IllegalPathException e) {
-      e.printStackTrace();
-      fail();
-    } finally {
-      instanceNotificationExecutor.shutdown();
-    }
-  }
-
-  /** test time join with non-exist sensor and order by time desc */
-  @Test
-  public void batchTest3() throws Exception {
-    ExecutorService instanceNotificationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
-    try {
-      MeasurementPath measurementPath1 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
-      Set<String> allSensors = new HashSet<>();
-      allSensors.add("sensor0");
-      allSensors.add("sensor1");
-      allSensors.add("error_sensor");
-      QueryId queryId = new QueryId("stub_query");
-      FragmentInstanceId instanceId =
-          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
-      FragmentInstanceStateMachine stateMachine =
-          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
-      FragmentInstanceContext fragmentInstanceContext =
-          createFragmentInstanceContext(instanceId, stateMachine);
-      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
-      PlanNodeId planNodeId1 = new PlanNodeId("1");
-      driverContext.addOperatorContext(1, planNodeId1, 
SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId2 = new PlanNodeId("2");
-      driverContext.addOperatorContext(2, planNodeId2, 
SeriesScanOperator.class.getSimpleName());
-      PlanNodeId planNodeId3 = new PlanNodeId("3");
-      driverContext.addOperatorContext(3, planNodeId3, 
SeriesScanOperator.class.getSimpleName());
-      driverContext.addOperatorContext(
-          4, new PlanNodeId("4"), TimeJoinOperator.class.getSimpleName());
-
-      SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
-      scanOptionsBuilder.withAllSensors(allSensors);
-      SeriesScanOperator seriesScanOperator1 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(0),
-              planNodeId1,
-              measurementPath1,
-              Ordering.DESC,
-              scanOptionsBuilder.build());
-      seriesScanOperator1.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator1
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      MeasurementPath measurementPath2 =
-          new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1", 
TSDataType.INT32);
-      SeriesScanOperator seriesScanOperator2 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(1),
-              planNodeId2,
-              measurementPath2,
-              Ordering.DESC,
-              scanOptionsBuilder.build());
-      seriesScanOperator2.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator2
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      MeasurementPath measurementPath3 =
-          new MeasurementPath(
-              TIME_JOIN_OPERATOR_TEST_SG + ".device0.error_sensor", 
TSDataType.INT32);
-      SeriesScanOperator seriesScanOperator3 =
-          new SeriesScanOperator(
-              driverContext.getOperatorContexts().get(2),
-              planNodeId3,
-              measurementPath3,
-              Ordering.ASC,
-              scanOptionsBuilder.build());
-      seriesScanOperator3.initQueryDataSource(new 
QueryDataSource(seqResources, unSeqResources));
-      seriesScanOperator3
-          .getOperatorContext()
-          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
-      TimeJoinOperator timeJoinOperator =
-          new TimeJoinOperator(
-              driverContext.getOperatorContexts().get(3),
-              Arrays.asList(seriesScanOperator1, seriesScanOperator2, 
seriesScanOperator3),
-              Ordering.DESC,
-              Arrays.asList(TSDataType.INT32, TSDataType.INT32, 
TSDataType.INT32),
-              Arrays.asList(
-                  new SingleColumnMerger(new InputLocation(0, 0), new 
DescTimeComparator()),
-                  new SingleColumnMerger(new InputLocation(1, 0), new 
DescTimeComparator()),
-                  new SingleColumnMerger(new InputLocation(2, 0), new 
DescTimeComparator())),
-              new DescTimeComparator());
-      int count = 499;
-      while (timeJoinOperator.hasNext()) {
-        TsBlock tsBlock = timeJoinOperator.next();
-        assertEquals(3, tsBlock.getValueColumnCount());
-        assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
-        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
-        assertTrue(tsBlock.getColumn(2) instanceof RunLengthEncodedColumn);
-        for (int i = 0; i < tsBlock.getPositionCount(); i++, count--) {
-          assertEquals(count, tsBlock.getTimeByIndex(i));
-          assertTrue(tsBlock.getColumn(2).isNull(i));
-          if ((long) count < 200) {
-            assertEquals(20000 + (long) count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(20000 + (long) count, tsBlock.getColumn(1).getInt(i));
-          } else if ((long) count < 260
-              || ((long) count >= 300 && (long) count < 380)
-              || (long) count >= 400) {
-            assertEquals(10000 + (long) count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(10000 + (long) count, tsBlock.getColumn(1).getInt(i));
-          } else {
-            assertEquals(count, tsBlock.getColumn(0).getInt(i));
-            assertEquals(count, tsBlock.getColumn(1).getInt(i));
-          }
-        }
-      }
-      assertEquals(-1, count);
-    } catch (IllegalPathException e) {
-      e.printStackTrace();
-      fail();
-    } finally {
-      instanceNotificationExecutor.shutdown();
-    }
-  }
-}

Reply via email to