This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TimeJoin by this push:
     new 7a54e9a63f0 add UT for InnerTimeJoinOperator
7a54e9a63f0 is described below

commit 7a54e9a63f0ba6a50c581326978a86eca65d5cca
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Dec 21 12:09:51 2023 +0800

    add UT for InnerTimeJoinOperator
---
 .../process/join/InnerTimeJoinOperator.java        |  33 +-
 .../process/join/InnerTimeJoinOperatorTest.java    | 733 +++++++++++++++++++--
 .../join/LeftOuterTimeJoinOperatorTest.java        |  33 +-
 3 files changed, 727 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
index 959683fd52f..bb36e02726c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
@@ -142,12 +142,17 @@ public class InnerTimeJoinOperator implements 
ProcessOperator {
       int[][] selectedRowIndexArray = buildTimeColumn(currentEndTime);
 
       // build value columns for each child
-      int columnIndex = 0;
-      for (int i = 0; i < inputOperatorsCount; i++) {
-        columnIndex += buildValueColumns(columnIndex, i, 
selectedRowIndexArray[i]);
+      if (selectedRowIndexArray[0].length > 0) {
+        int columnIndex = 0;
+        for (int i = 0; i < inputOperatorsCount; i++) {
+          columnIndex += buildValueColumns(columnIndex, i, 
selectedRowIndexArray[i]);
+        }
       }
     }
 
+    // set corresponding inputTsBlock to null if its index already reach its 
size, friendly for gc
+    cleanUpInputTsBlock();
+
     TsBlock res = resultBuilder.build();
     resultBuilder.reset();
     return res;
@@ -189,7 +194,7 @@ public class InnerTimeJoinOperator implements 
ProcessOperator {
 
     // update inputIndex for each child to the last index larger than 
currentEndTime
     for (int i = 0; i < inputOperatorsCount; i++) {
-      updateInputIndex(i, currentEndTime);
+      updateInputIndexUntilLargerThan(i, currentEndTime);
     }
 
     return transformListToIntArray(selectedRowIndexArray);
@@ -197,7 +202,7 @@ public class InnerTimeJoinOperator implements 
ProcessOperator {
 
   private void appendOneSelectedRow(List<List<Integer>> selectedRowIndexArray) 
{
     for (int i = 0; i < inputOperatorsCount; i++) {
-      selectedRowIndexArray.get(0).add(inputIndex[i] - 1);
+      selectedRowIndexArray.get(i).add(inputIndex[i] - 1);
     }
   }
 
@@ -209,6 +214,24 @@ public class InnerTimeJoinOperator implements 
ProcessOperator {
     }
   }
 
+  private void updateInputIndexUntilLargerThan(int i, long currentEndTime) {
+    int size = inputTsBlocks[i].getPositionCount();
+    while (inputIndex[i] < size
+        && comparator.canContinueInclusive(
+            inputTsBlocks[i].getTimeByIndex(inputIndex[i]), currentEndTime)) {
+      inputIndex[i]++;
+    }
+  }
+
+  private void cleanUpInputTsBlock() {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (inputTsBlocks[i].getPositionCount() == inputIndex[i]) {
+        inputTsBlocks[i] = null;
+        inputIndex[i] = 0;
+      }
+    }
+  }
+
   private int[][] transformListToIntArray(List<List<Integer>> lists) {
     if (lists.size() <= 1) {
       throw new IllegalStateException(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperatorTest.java
index 181799c7f52..7d05e0f23bc 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperatorTest.java
@@ -27,11 +27,14 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.units.Duration;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
@@ -42,7 +45,7 @@ import static org.junit.Assert.fail;
 public class InnerTimeJoinOperatorTest {
 
   @Test
-  public void testLeftOuterJoin1() {
+  public void testInnerJoin1() {
     // left table
     // Time, s1
     // 4     4
@@ -76,12 +79,10 @@ public class InnerTimeJoinOperatorTest {
     // result table
     // Time, s1,    s2
     // 4,     4,    40
-    // 6,     6,    null
-    // 9      9,    null
     // 13     13,   130
-    // 17     17,   null
-    // 22     22,   null
-    // 25     25,   null
+
+    OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, 
TimeUnit.SECONDS));
 
     Operator leftChild =
         new Operator() {
@@ -112,7 +113,7 @@ public class InnerTimeJoinOperatorTest {
 
           @Override
           public OperatorContext getOperatorContext() {
-            return null;
+            return operatorContext;
           }
 
           @Override
@@ -194,7 +195,7 @@ public class InnerTimeJoinOperatorTest {
 
           @Override
           public OperatorContext getOperatorContext() {
-            return null;
+            return operatorContext;
           }
 
           @Override
@@ -244,36 +245,33 @@ public class InnerTimeJoinOperatorTest {
           }
         };
 
-    OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
-    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, 
TimeUnit.SECONDS));
-
-    LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
-        new LeftOuterTimeJoinOperator(
+    InnerTimeJoinOperator innerTimeJoinOperator =
+        new InnerTimeJoinOperator(
             operatorContext,
-            leftChild,
-            1,
-            rightChild,
+            Arrays.asList(leftChild, rightChild),
             Arrays.asList(TSDataType.INT32, TSDataType.INT64),
             new AscTimeComparator());
 
     assertEquals(
         TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes() 
+ 64 * 1024 * 2,
-        leftOuterTimeJoinOperator.calculateMaxPeekMemory());
+        innerTimeJoinOperator.calculateMaxPeekMemory());
     assertEquals(
         TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
-        leftOuterTimeJoinOperator.calculateMaxReturnSize());
-    assertEquals(64 * 1024 * 2, 
leftOuterTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
+        innerTimeJoinOperator.calculateMaxReturnSize());
+    assertEquals(64 * 1024, 
innerTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
 
-    long[] timeArray = new long[] {4L, 6L, 9L, 13L, 17L, 22L, 25L};
-    int[] column1Array = new int[] {4, 6, 9, 13, 17, 22, 25};
-    boolean[] column1IsNull = new boolean[] {false, false, false, false, 
false, false, false};
-    long[] column2Array = new long[] {40L, 0L, 0L, 130L, 0L, 0L, 0L};
-    boolean[] column2IsNull = new boolean[] {false, true, true, false, true, 
true, true};
+    long[] timeArray = new long[] {4L, 13L};
+    int[] column1Array = new int[] {4, 13};
+    boolean[] column1IsNull = new boolean[] {false, false};
+    long[] column2Array = new long[] {40L, 130L};
+    boolean[] column2IsNull = new boolean[] {false, false};
 
     try {
       int count = 0;
-      while (leftOuterTimeJoinOperator.hasNext()) {
-        TsBlock tsBlock = leftOuterTimeJoinOperator.next();
+      ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
+      listenableFuture.get();
+      while (!innerTimeJoinOperator.isFinished() && 
innerTimeJoinOperator.hasNext()) {
+        TsBlock tsBlock = innerTimeJoinOperator.next();
         if (tsBlock != null && !tsBlock.isEmpty()) {
           for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
             assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
@@ -287,6 +285,8 @@ public class InnerTimeJoinOperatorTest {
             }
           }
         }
+        listenableFuture = innerTimeJoinOperator.isBlocked();
+        listenableFuture.get();
       }
       assertEquals(timeArray.length, count);
     } catch (Exception e) {
@@ -296,7 +296,7 @@ public class InnerTimeJoinOperatorTest {
   }
 
   @Test
-  public void testLeftOuterJoin2() {
+  public void testInnerJoin2() {
     // left table
     // Time, s1,     s2
     // 25    null    26
@@ -348,16 +348,15 @@ public class InnerTimeJoinOperatorTest {
 
     // result table
     // Time, s1,     s2,     s3,     s4
-    // 25    null    26      null    null
-    // 22    22      null    null    null
     // 19    19      20      190.0   true
     // 18    18      null    180.0   null
     // 15    null    16      null    false
-    // 9     null    null    null    null
     // 7     7       null    null    false
-    // 6     null    7       null    null
     // 3     3       4       30.0    false
 
+    OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new 
Duration(1000, TimeUnit.SECONDS));
+
     Operator leftChild =
         new Operator() {
           private final long[][] timeArray =
@@ -379,7 +378,7 @@ public class InnerTimeJoinOperatorTest {
 
           @Override
           public OperatorContext getOperatorContext() {
-            return null;
+            return operatorContext;
           }
 
           @Override
@@ -501,7 +500,7 @@ public class InnerTimeJoinOperatorTest {
 
           @Override
           public OperatorContext getOperatorContext() {
-            return null;
+            return operatorContext;
           }
 
           @Override
@@ -560,37 +559,303 @@ public class InnerTimeJoinOperatorTest {
           }
         };
 
+    InnerTimeJoinOperator innerTimeJoinOperator =
+        new InnerTimeJoinOperator(
+            operatorContext,
+            Arrays.asList(leftChild, rightChild),
+            Arrays.asList(TSDataType.INT32, TSDataType.INT64, 
TSDataType.FLOAT, TSDataType.BOOLEAN),
+            new DescTimeComparator());
+
+    long[] timeArray = new long[] {19L, 18L, 15L, 7L, 3L};
+    int[] column1Array = new int[] {19, 18, 0, 7, 3};
+    boolean[] column1IsNull = new boolean[] {false, false, true, false, false};
+    long[] column2Array = new long[] {20L, 0L, 16L, 0L, 4L};
+    boolean[] column2IsNull = new boolean[] {false, true, false, true, false};
+    float[] column3Array = new float[] {190.0f, 180.0f, 0.0f, 0.0f, 30.0f};
+    boolean[] column3IsNull = new boolean[] {false, false, true, true, false};
+    boolean[] column4Array = new boolean[] {true, false, false, false, false};
+    boolean[] column4IsNull = new boolean[] {false, true, false, false, false};
+
+    try {
+      int count = 0;
+      ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
+      listenableFuture.get();
+      while (!innerTimeJoinOperator.isFinished() && 
innerTimeJoinOperator.hasNext()) {
+        TsBlock tsBlock = innerTimeJoinOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
+            assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+            assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+            if (!column1IsNull[count]) {
+              assertEquals(column1Array[count], 
tsBlock.getColumn(0).getInt(i));
+            }
+            assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+            if (!column2IsNull[count]) {
+              assertEquals(column2Array[count], 
tsBlock.getColumn(1).getLong(i));
+            }
+            assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+            if (!column3IsNull[count]) {
+              assertEquals(column3Array[count], 
tsBlock.getColumn(2).getFloat(i), 0.000001);
+            }
+            assertEquals(column4IsNull[count], tsBlock.getColumn(3).isNull(i));
+            if (!column4IsNull[count]) {
+              assertEquals(column4Array[count], 
tsBlock.getColumn(3).getBoolean(i));
+            }
+          }
+        }
+        listenableFuture = innerTimeJoinOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(timeArray.length, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInnerJoin3() {
+    // child-1
+    // Time, s1,     s2
+    // 100   100     200
+    // 90    90      180
+    // 80    80      160
+    // 70    70      140
+    // 60    60      120
+    // 50    50      100
+    // 40    40      80
+    // 30    30      60
+    // 20    20      40
+    // 10    10      20
+    // 0     0       0
+    // ---------------------- TsBlock-1
+
+    // child-2
+    // Time,   s3,        s4
+    // 1000    3000.0     false
+    // 500     500.0      true
+    // 100     300.0      null
+    // ------------------------- TsBlock-1
+    // 99      99.0       true
+    // 95      95.0       null
+    // 90      null       false
+    // ------------------------- TsBlock-2
+    // 50      150.0      true
+    // 48      48.0       true
+    // 20      60.0       null
+    // 10      null       false
+    // ------------------------- TsBlock-3
+
+    // result table
+    // Time, s1,     s2,     s3,     s4
+    // 100   100     200     300.0   null
+    // 90    90      180     null    false
+    // 50    50      100     150.0   true
+    // 20    20      40      60.0    null
+    // 10    10      20      null    false
+
     OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
-    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, 
TimeUnit.SECONDS));
+    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new 
Duration(1000, TimeUnit.SECONDS));
+
+    Operator child1 =
+        new Operator() {
+          private final long[][] timeArray =
+              new long[][] {{100L, 90L, 80L, 70L, 60L, 50L, 40L, 30L, 20L, 
10L, 0L}};
+
+          private final int[][] value1Array =
+              new int[][] {{100, 90, 80, 70, 60, 50, 40, 30, 20, 10, 0}};
+
+          private final long[][] value2Array =
+              new long[][] {{200L, 180L, 160, 140, 120, 100L, 80L, 60L, 40L, 
20L, 0L}};
+
+          private final boolean[][][] valueIsNull =
+              new boolean[][][] {
+                {{false, false, false, false, false, false, false, false, 
false, false, false}},
+                {{false, false, false, false, false, false, false, false, 
false, false, false}}
+              };
+
+          private int index = 0;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return operatorContext;
+          }
+
+          @Override
+          public TsBlock next() {
+            if (timeArray[index] == null) {
+              index++;
+              return null;
+            }
+            TsBlockBuilder builder =
+                new TsBlockBuilder(
+                    timeArray[index].length, Arrays.asList(TSDataType.INT32, 
TSDataType.INT64));
+            for (int i = 0, size = timeArray[index].length; i < size; i++) {
+              builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+              if (valueIsNull[0][index][i]) {
+                builder.getColumnBuilder(0).appendNull();
+              } else {
+                builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
+              }
+              if (valueIsNull[1][index][i]) {
+                builder.getColumnBuilder(1).appendNull();
+              } else {
+                builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
+              }
+            }
+            builder.declarePositions(timeArray[index].length);
+            index++;
+            return builder.build();
+          }
+
+          @Override
+          public boolean hasNext() {
+            return index < 1;
+          }
+
+          @Override
+          public void close() {}
+
+          @Override
+          public boolean isFinished() {
+            return index >= 1;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+        };
+
+    Operator child2 =
+        new Operator() {
+          private final long[][] timeArray =
+              new long[][] {{1000L, 500L, 100L}, {99L, 95L, 90L}, {50L, 48L, 
20L, 10L}};
+
+          private final float[][] value1Array =
+              new float[][] {
+                {3000.0f, 500.0f, 300.0f},
+                {99.0f, 95.0f, 0.0f},
+                {150.0f, 48.0f, 60.0f, 0.0f}
+              };
+
+          private final boolean[][] value2Array =
+              new boolean[][] {
+                {false, true, false},
+                {true, false, false},
+                {true, true, false, false}
+              };
+
+          private final boolean[][][] valueIsNull =
+              new boolean[][][] {
+                {
+                  {false, false, false},
+                  {false, false, true},
+                  {false, false, false, true}
+                },
+                {
+                  {false, false, true},
+                  {false, true, false},
+                  {false, false, true, false}
+                }
+              };
+
+          private int index = 0;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return operatorContext;
+          }
+
+          @Override
+          public TsBlock next() {
+            if (timeArray[index] == null) {
+              index++;
+              return null;
+            }
+            TsBlockBuilder builder =
+                new TsBlockBuilder(
+                    timeArray[index].length, Arrays.asList(TSDataType.FLOAT, 
TSDataType.BOOLEAN));
+            for (int i = 0, size = timeArray[index].length; i < size; i++) {
+              builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+              if (valueIsNull[0][index][i]) {
+                builder.getColumnBuilder(0).appendNull();
+              } else {
+                builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
+              }
+              if (valueIsNull[1][index][i]) {
+                builder.getColumnBuilder(1).appendNull();
+              } else {
+                
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
+              }
+            }
+            builder.declarePositions(timeArray[index].length);
+            index++;
+            return builder.build();
+          }
+
+          @Override
+          public boolean hasNext() {
+            return index < 3;
+          }
+
+          @Override
+          public void close() {}
+
+          @Override
+          public boolean isFinished() {
+            return index >= 3;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 64 * 1024;
+          }
 
-    LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
-        new LeftOuterTimeJoinOperator(
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+        };
+
+    InnerTimeJoinOperator innerTimeJoinOperator =
+        new InnerTimeJoinOperator(
             operatorContext,
-            leftChild,
-            2,
-            rightChild,
+            Arrays.asList(child1, child2),
             Arrays.asList(TSDataType.INT32, TSDataType.INT64, 
TSDataType.FLOAT, TSDataType.BOOLEAN),
             new DescTimeComparator());
 
-    long[] timeArray = new long[] {25L, 22L, 19L, 18L, 15L, 9L, 7L, 6L, 3L};
-    int[] column1Array = new int[] {0, 22, 19, 18, 0, 0, 7, 0, 3};
-    boolean[] column1IsNull =
-        new boolean[] {true, false, false, false, true, true, false, true, 
false};
-    long[] column2Array = new long[] {26L, 0L, 20L, 0L, 16L, 0L, 0L, 7L, 4L};
-    boolean[] column2IsNull =
-        new boolean[] {false, true, false, true, false, true, true, false, 
false};
-    float[] column3Array = new float[] {0.0f, 0.0f, 190.0f, 180.0f, 0.0f, 
0.0f, 0.0f, 0.0f, 30.0f};
-    boolean[] column3IsNull =
-        new boolean[] {true, true, false, false, true, true, true, true, 
false};
-    boolean[] column4Array =
-        new boolean[] {false, false, true, false, false, false, false, false, 
false};
-    boolean[] column4IsNull =
-        new boolean[] {true, true, false, true, false, true, false, true, 
false};
+    long[] timeArray = new long[] {100L, 90L, 50L, 20L, 10L};
+    int[] column1Array = new int[] {100, 90, 50, 20, 10};
+    boolean[] column1IsNull = new boolean[] {false, false, false, false, 
false};
+    long[] column2Array = new long[] {200L, 180L, 100L, 40L, 20L};
+    boolean[] column2IsNull = new boolean[] {false, false, false, false, 
false};
+    float[] column3Array = new float[] {300.0f, 0.0f, 150.0f, 60.0f, 0.0f};
+    boolean[] column3IsNull = new boolean[] {false, true, false, false, true};
+    boolean[] column4Array = new boolean[] {false, false, true, false, false};
+    boolean[] column4IsNull = new boolean[] {true, false, false, true, false};
 
     try {
       int count = 0;
-      while (leftOuterTimeJoinOperator.hasNext()) {
-        TsBlock tsBlock = leftOuterTimeJoinOperator.next();
+      ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
+      listenableFuture.get();
+      while (!innerTimeJoinOperator.isFinished() && 
innerTimeJoinOperator.hasNext()) {
+        TsBlock tsBlock = innerTimeJoinOperator.next();
         if (tsBlock != null && !tsBlock.isEmpty()) {
           for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
             assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
@@ -612,6 +877,8 @@ public class InnerTimeJoinOperatorTest {
             }
           }
         }
+        listenableFuture = innerTimeJoinOperator.isBlocked();
+        listenableFuture.get();
       }
       assertEquals(timeArray.length, count);
     } catch (Exception e) {
@@ -619,4 +886,360 @@ public class InnerTimeJoinOperatorTest {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testInnerJoin4() {
+    // child-1
+    // Time, s1,     s2
+    // 100   100     200
+    // 90    90      180
+    // 80    80      160
+    // 70    70      140
+    // 60    60      120
+    // 50    50      100
+    // 40    40      80
+    // 30    30      60
+    // 20    20      40
+    // 10    10      20
+    // 0     0       0
+    // ---------------------- TsBlock-1
+
+    // child-2
+    // Time,   s3,        s4
+    // 1000    3000.0     false
+    // 500     500.0      true
+    // 100     300.0      null
+    // ------------------------- TsBlock-1
+    // 99      99.0       true
+    // 95      95.0       null
+    // 90      null       false
+    // ------------------------- TsBlock-2
+    // 50      150.0      true
+    // 48      48.0       true
+    // 20      60.0       null
+    // 10      null       false
+    // ------------------------- TsBlock-3
+
+    // child-3
+    // Time,   s5,
+    // 1000    "iotdb"
+    // 500     "ty"
+    // 101     "zm"
+    // --------------------- TsBlock-1
+    // 99      "ty"
+    // 95      "love"
+    // 80      "zm"
+    // 60      "2018-05-06"
+    // --------------------- TsBlock-2
+    // 40      "1997-09-09"
+    // 22      "1995-04-21"
+    // 11      "2022-04-21"
+    // 0       "2023-12-30"
+    // --------------------- TsBlock-3
+
+    // result table
+    // Time, s1,     s2,     s3,     s4
+    // empty
+
+    OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new 
Duration(1000, TimeUnit.SECONDS));
+
+    Operator child1 =
+        new Operator() {
+          private final long[][] timeArray =
+              new long[][] {{100L, 90L, 80L, 70L, 60L, 50L, 40L, 30L, 20L, 
10L, 0L}};
+
+          private final int[][] value1Array =
+              new int[][] {{100, 90, 80, 70, 60, 50, 40, 30, 20, 10, 0}};
+
+          private final long[][] value2Array =
+              new long[][] {{200L, 180L, 160, 140, 120, 100L, 80L, 60L, 40L, 
20L, 0L}};
+
+          private final boolean[][][] valueIsNull =
+              new boolean[][][] {
+                {{false, false, false, false, false, false, false, false, 
false, false, false}},
+                {{false, false, false, false, false, false, false, false, 
false, false, false}}
+              };
+
+          private int index = 0;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return operatorContext;
+          }
+
+          @Override
+          public TsBlock next() {
+            if (timeArray[index] == null) {
+              index++;
+              return null;
+            }
+            TsBlockBuilder builder =
+                new TsBlockBuilder(
+                    timeArray[index].length, Arrays.asList(TSDataType.INT32, 
TSDataType.INT64));
+            for (int i = 0, size = timeArray[index].length; i < size; i++) {
+              builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+              if (valueIsNull[0][index][i]) {
+                builder.getColumnBuilder(0).appendNull();
+              } else {
+                builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
+              }
+              if (valueIsNull[1][index][i]) {
+                builder.getColumnBuilder(1).appendNull();
+              } else {
+                builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
+              }
+            }
+            builder.declarePositions(timeArray[index].length);
+            index++;
+            return builder.build();
+          }
+
+          @Override
+          public boolean hasNext() {
+            return index < 1;
+          }
+
+          @Override
+          public void close() {}
+
+          @Override
+          public boolean isFinished() {
+            return index >= 1;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+        };
+
+    Operator child2 =
+        new Operator() {
+          private final long[][] timeArray =
+              new long[][] {{1000L, 500L, 100L}, {99L, 95L, 90L}, {50L, 48L, 
20L, 10L}};
+
+          private final float[][] value1Array =
+              new float[][] {
+                {3000.0f, 500.0f, 300.0f},
+                {99.0f, 95.0f, 0.0f},
+                {150.0f, 48.0f, 60.0f, 0.0f}
+              };
+
+          private final boolean[][] value2Array =
+              new boolean[][] {
+                {false, true, false},
+                {true, false, false},
+                {true, true, false, false}
+              };
+
+          private final boolean[][][] valueIsNull =
+              new boolean[][][] {
+                {
+                  {false, false, false},
+                  {false, false, true},
+                  {false, false, false, true}
+                },
+                {
+                  {false, false, true},
+                  {false, true, false},
+                  {false, false, true, false}
+                }
+              };
+
+          private int index = 0;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return operatorContext;
+          }
+
+          @Override
+          public TsBlock next() {
+            if (timeArray[index] == null) {
+              index++;
+              return null;
+            }
+            TsBlockBuilder builder =
+                new TsBlockBuilder(
+                    timeArray[index].length, Arrays.asList(TSDataType.FLOAT, 
TSDataType.BOOLEAN));
+            for (int i = 0, size = timeArray[index].length; i < size; i++) {
+              builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+              if (valueIsNull[0][index][i]) {
+                builder.getColumnBuilder(0).appendNull();
+              } else {
+                builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
+              }
+              if (valueIsNull[1][index][i]) {
+                builder.getColumnBuilder(1).appendNull();
+              } else {
+                
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
+              }
+            }
+            builder.declarePositions(timeArray[index].length);
+            index++;
+            return builder.build();
+          }
+
+          @Override
+          public boolean hasNext() {
+            return index < 3;
+          }
+
+          @Override
+          public void close() {}
+
+          @Override
+          public boolean isFinished() {
+            return index >= 3;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+        };
+
+    Operator child3 =
+        new Operator() {
+          private final long[][] timeArray =
+              new long[][] {{1000L, 500L, 101L}, {99L, 95L, 80L, 60L}, {40L, 
22L, 11L, 0L}};
+
+          private final Binary[][] value1Array =
+              new Binary[][] {
+                {
+                  new Binary("iotdb".getBytes(StandardCharsets.UTF_8)),
+                  new Binary("ty".getBytes(StandardCharsets.UTF_8)),
+                  new Binary("zm".getBytes(StandardCharsets.UTF_8))
+                },
+                {
+                  new Binary("ty".getBytes(StandardCharsets.UTF_8)),
+                  new Binary("love".getBytes(StandardCharsets.UTF_8)),
+                  new Binary("zm".getBytes(StandardCharsets.UTF_8)),
+                  new Binary("2018-05-06".getBytes(StandardCharsets.UTF_8))
+                },
+                {
+                  new Binary("1997-09-09".getBytes(StandardCharsets.UTF_8)),
+                  new Binary("1995-04-21".getBytes(StandardCharsets.UTF_8)),
+                  new Binary("2022-04-21".getBytes(StandardCharsets.UTF_8)),
+                  new Binary("2023-12-30".getBytes(StandardCharsets.UTF_8))
+                }
+              };
+
+          private final boolean[][][] valueIsNull =
+              new boolean[][][] {
+                {
+                  {false, false, false},
+                  {false, false, false, false},
+                  {false, false, false, false}
+                }
+              };
+
+          private int index = 0;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return operatorContext;
+          }
+
+          @Override
+          public TsBlock next() {
+            if (timeArray[index] == null) {
+              index++;
+              return null;
+            }
+            TsBlockBuilder builder =
+                new TsBlockBuilder(timeArray[index].length, 
Arrays.asList(TSDataType.TEXT));
+            for (int i = 0, size = timeArray[index].length; i < size; i++) {
+              builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+              if (valueIsNull[0][index][i]) {
+                builder.getColumnBuilder(0).appendNull();
+              } else {
+                builder.getColumnBuilder(0).writeBinary(value1Array[index][i]);
+              }
+            }
+            builder.declarePositions(timeArray[index].length);
+            index++;
+            return builder.build();
+          }
+
+          @Override
+          public boolean hasNext() {
+            return index < 3;
+          }
+
+          @Override
+          public void close() {}
+
+          @Override
+          public boolean isFinished() {
+            return index >= 3;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 64 * 1024;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+        };
+
+    InnerTimeJoinOperator innerTimeJoinOperator =
+        new InnerTimeJoinOperator(
+            operatorContext,
+            Arrays.asList(child1, child2, child3),
+            Arrays.asList(
+                TSDataType.INT32,
+                TSDataType.INT64,
+                TSDataType.FLOAT,
+                TSDataType.BOOLEAN,
+                TSDataType.TEXT),
+            new DescTimeComparator());
+
+    try {
+      int count = 0;
+      ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
+      listenableFuture.get();
+      while (!innerTimeJoinOperator.isFinished() && 
innerTimeJoinOperator.hasNext()) {
+        TsBlock tsBlock = innerTimeJoinOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          count += tsBlock.getPositionCount();
+        }
+        listenableFuture = innerTimeJoinOperator.isBlocked();
+        listenableFuture.get();
+      }
+      assertEquals(0, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
index a3488806834..bf50a037829 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.units.Duration;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -83,6 +84,9 @@ public class LeftOuterTimeJoinOperatorTest {
     // 22     22,   null
     // 25     25,   null
 
+    OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, 
TimeUnit.SECONDS));
+
     Operator leftChild =
         new Operator() {
           private final long[][] timeArray =
@@ -112,7 +116,7 @@ public class LeftOuterTimeJoinOperatorTest {
 
           @Override
           public OperatorContext getOperatorContext() {
-            return null;
+            return operatorContext;
           }
 
           @Override
@@ -194,7 +198,7 @@ public class LeftOuterTimeJoinOperatorTest {
 
           @Override
           public OperatorContext getOperatorContext() {
-            return null;
+            return operatorContext;
           }
 
           @Override
@@ -244,9 +248,6 @@ public class LeftOuterTimeJoinOperatorTest {
           }
         };
 
-    OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
-    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, 
TimeUnit.SECONDS));
-
     LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
         new LeftOuterTimeJoinOperator(
             operatorContext,
@@ -272,7 +273,9 @@ public class LeftOuterTimeJoinOperatorTest {
 
     try {
       int count = 0;
-      while (leftOuterTimeJoinOperator.hasNext()) {
+      ListenableFuture<?> listenableFuture = 
leftOuterTimeJoinOperator.isBlocked();
+      listenableFuture.get();
+      while (!leftOuterTimeJoinOperator.isFinished() && 
leftOuterTimeJoinOperator.hasNext()) {
         TsBlock tsBlock = leftOuterTimeJoinOperator.next();
         if (tsBlock != null && !tsBlock.isEmpty()) {
           for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
@@ -287,6 +290,8 @@ public class LeftOuterTimeJoinOperatorTest {
             }
           }
         }
+        listenableFuture = leftOuterTimeJoinOperator.isBlocked();
+        listenableFuture.get();
       }
       assertEquals(timeArray.length, count);
     } catch (Exception e) {
@@ -358,6 +363,9 @@ public class LeftOuterTimeJoinOperatorTest {
     // 6     null    7       null    null
     // 3     3       4       30.0    false
 
+    OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, 
TimeUnit.SECONDS));
+
     Operator leftChild =
         new Operator() {
           private final long[][] timeArray =
@@ -379,7 +387,7 @@ public class LeftOuterTimeJoinOperatorTest {
 
           @Override
           public OperatorContext getOperatorContext() {
-            return null;
+            return operatorContext;
           }
 
           @Override
@@ -501,7 +509,7 @@ public class LeftOuterTimeJoinOperatorTest {
 
           @Override
           public OperatorContext getOperatorContext() {
-            return null;
+            return operatorContext;
           }
 
           @Override
@@ -560,9 +568,6 @@ public class LeftOuterTimeJoinOperatorTest {
           }
         };
 
-    OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
-    Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1, 
TimeUnit.SECONDS));
-
     LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
         new LeftOuterTimeJoinOperator(
             operatorContext,
@@ -589,7 +594,9 @@ public class LeftOuterTimeJoinOperatorTest {
 
     try {
       int count = 0;
-      while (leftOuterTimeJoinOperator.hasNext()) {
+      ListenableFuture<?> listenableFuture = 
leftOuterTimeJoinOperator.isBlocked();
+      listenableFuture.get();
+      while (!leftOuterTimeJoinOperator.isFinished() && 
leftOuterTimeJoinOperator.hasNext()) {
         TsBlock tsBlock = leftOuterTimeJoinOperator.next();
         if (tsBlock != null && !tsBlock.isEmpty()) {
           for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++, 
count++) {
@@ -612,6 +619,8 @@ public class LeftOuterTimeJoinOperatorTest {
             }
           }
         }
+        listenableFuture = leftOuterTimeJoinOperator.isBlocked();
+        listenableFuture.get();
       }
       assertEquals(timeArray.length, count);
     } catch (Exception e) {


Reply via email to