This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TimeJoin by this push:
new 7a54e9a63f0 add UT for InnerTimeJoinOperator
7a54e9a63f0 is described below
commit 7a54e9a63f0ba6a50c581326978a86eca65d5cca
Author: JackieTien97 <[email protected]>
AuthorDate: Thu Dec 21 12:09:51 2023 +0800
add UT for InnerTimeJoinOperator
---
.../process/join/InnerTimeJoinOperator.java | 33 +-
.../process/join/InnerTimeJoinOperatorTest.java | 733 +++++++++++++++++++--
.../join/LeftOuterTimeJoinOperatorTest.java | 33 +-
3 files changed, 727 insertions(+), 72 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
index 959683fd52f..bb36e02726c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperator.java
@@ -142,12 +142,17 @@ public class InnerTimeJoinOperator implements
ProcessOperator {
int[][] selectedRowIndexArray = buildTimeColumn(currentEndTime);
// build value columns for each child
- int columnIndex = 0;
- for (int i = 0; i < inputOperatorsCount; i++) {
- columnIndex += buildValueColumns(columnIndex, i,
selectedRowIndexArray[i]);
+ if (selectedRowIndexArray[0].length > 0) {
+ int columnIndex = 0;
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ columnIndex += buildValueColumns(columnIndex, i,
selectedRowIndexArray[i]);
+ }
}
}
+ // set corresponding inputTsBlock to null if its index already reach its
size, friendly for gc
+ cleanUpInputTsBlock();
+
TsBlock res = resultBuilder.build();
resultBuilder.reset();
return res;
@@ -189,7 +194,7 @@ public class InnerTimeJoinOperator implements
ProcessOperator {
// update inputIndex for each child to the last index larger than
currentEndTime
for (int i = 0; i < inputOperatorsCount; i++) {
- updateInputIndex(i, currentEndTime);
+ updateInputIndexUntilLargerThan(i, currentEndTime);
}
return transformListToIntArray(selectedRowIndexArray);
@@ -197,7 +202,7 @@ public class InnerTimeJoinOperator implements
ProcessOperator {
private void appendOneSelectedRow(List<List<Integer>> selectedRowIndexArray)
{
for (int i = 0; i < inputOperatorsCount; i++) {
- selectedRowIndexArray.get(0).add(inputIndex[i] - 1);
+ selectedRowIndexArray.get(i).add(inputIndex[i] - 1);
}
}
@@ -209,6 +214,24 @@ public class InnerTimeJoinOperator implements
ProcessOperator {
}
}
+ private void updateInputIndexUntilLargerThan(int i, long currentEndTime) {
+ int size = inputTsBlocks[i].getPositionCount();
+ while (inputIndex[i] < size
+ && comparator.canContinueInclusive(
+ inputTsBlocks[i].getTimeByIndex(inputIndex[i]), currentEndTime)) {
+ inputIndex[i]++;
+ }
+ }
+
+ private void cleanUpInputTsBlock() {
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (inputTsBlocks[i].getPositionCount() == inputIndex[i]) {
+ inputTsBlocks[i] = null;
+ inputIndex[i] = 0;
+ }
+ }
+ }
+
private int[][] transformListToIntArray(List<List<Integer>> lists) {
if (lists.size() <= 1) {
throw new IllegalStateException(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperatorTest.java
index 181799c7f52..7d05e0f23bc 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/InnerTimeJoinOperatorTest.java
@@ -27,11 +27,14 @@ import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.junit.Test;
import org.mockito.Mockito;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@@ -42,7 +45,7 @@ import static org.junit.Assert.fail;
public class InnerTimeJoinOperatorTest {
@Test
- public void testLeftOuterJoin1() {
+ public void testInnerJoin1() {
// left table
// Time, s1
// 4 4
@@ -76,12 +79,10 @@ public class InnerTimeJoinOperatorTest {
// result table
// Time, s1, s2
// 4, 4, 40
- // 6, 6, null
- // 9 9, null
// 13 13, 130
- // 17 17, null
- // 22 22, null
- // 25 25, null
+
+ OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+ Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
Operator leftChild =
new Operator() {
@@ -112,7 +113,7 @@ public class InnerTimeJoinOperatorTest {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
@@ -194,7 +195,7 @@ public class InnerTimeJoinOperatorTest {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
@@ -244,36 +245,33 @@ public class InnerTimeJoinOperatorTest {
}
};
- OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
- Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
-
- LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
- new LeftOuterTimeJoinOperator(
+ InnerTimeJoinOperator innerTimeJoinOperator =
+ new InnerTimeJoinOperator(
operatorContext,
- leftChild,
- 1,
- rightChild,
+ Arrays.asList(leftChild, rightChild),
Arrays.asList(TSDataType.INT32, TSDataType.INT64),
new AscTimeComparator());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes()
+ 64 * 1024 * 2,
- leftOuterTimeJoinOperator.calculateMaxPeekMemory());
+ innerTimeJoinOperator.calculateMaxPeekMemory());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
- leftOuterTimeJoinOperator.calculateMaxReturnSize());
- assertEquals(64 * 1024 * 2,
leftOuterTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
+ innerTimeJoinOperator.calculateMaxReturnSize());
+ assertEquals(64 * 1024,
innerTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
- long[] timeArray = new long[] {4L, 6L, 9L, 13L, 17L, 22L, 25L};
- int[] column1Array = new int[] {4, 6, 9, 13, 17, 22, 25};
- boolean[] column1IsNull = new boolean[] {false, false, false, false,
false, false, false};
- long[] column2Array = new long[] {40L, 0L, 0L, 130L, 0L, 0L, 0L};
- boolean[] column2IsNull = new boolean[] {false, true, true, false, true,
true, true};
+ long[] timeArray = new long[] {4L, 13L};
+ int[] column1Array = new int[] {4, 13};
+ boolean[] column1IsNull = new boolean[] {false, false};
+ long[] column2Array = new long[] {40L, 130L};
+ boolean[] column2IsNull = new boolean[] {false, false};
try {
int count = 0;
- while (leftOuterTimeJoinOperator.hasNext()) {
- TsBlock tsBlock = leftOuterTimeJoinOperator.next();
+ ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
+ listenableFuture.get();
+ while (!innerTimeJoinOperator.isFinished() &&
innerTimeJoinOperator.hasNext()) {
+ TsBlock tsBlock = innerTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
@@ -287,6 +285,8 @@ public class InnerTimeJoinOperatorTest {
}
}
}
+ listenableFuture = innerTimeJoinOperator.isBlocked();
+ listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
@@ -296,7 +296,7 @@ public class InnerTimeJoinOperatorTest {
}
@Test
- public void testLeftOuterJoin2() {
+ public void testInnerJoin2() {
// left table
// Time, s1, s2
// 25 null 26
@@ -348,16 +348,15 @@ public class InnerTimeJoinOperatorTest {
// result table
// Time, s1, s2, s3, s4
- // 25 null 26 null null
- // 22 22 null null null
// 19 19 20 190.0 true
// 18 18 null 180.0 null
// 15 null 16 null false
- // 9 null null null null
// 7 7 null null false
- // 6 null 7 null null
// 3 3 4 30.0 false
+ OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+ Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new
Duration(1000, TimeUnit.SECONDS));
+
Operator leftChild =
new Operator() {
private final long[][] timeArray =
@@ -379,7 +378,7 @@ public class InnerTimeJoinOperatorTest {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
@@ -501,7 +500,7 @@ public class InnerTimeJoinOperatorTest {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
@@ -560,37 +559,303 @@ public class InnerTimeJoinOperatorTest {
}
};
+ InnerTimeJoinOperator innerTimeJoinOperator =
+ new InnerTimeJoinOperator(
+ operatorContext,
+ Arrays.asList(leftChild, rightChild),
+ Arrays.asList(TSDataType.INT32, TSDataType.INT64,
TSDataType.FLOAT, TSDataType.BOOLEAN),
+ new DescTimeComparator());
+
+ long[] timeArray = new long[] {19L, 18L, 15L, 7L, 3L};
+ int[] column1Array = new int[] {19, 18, 0, 7, 3};
+ boolean[] column1IsNull = new boolean[] {false, false, true, false, false};
+ long[] column2Array = new long[] {20L, 0L, 16L, 0L, 4L};
+ boolean[] column2IsNull = new boolean[] {false, true, false, true, false};
+ float[] column3Array = new float[] {190.0f, 180.0f, 0.0f, 0.0f, 30.0f};
+ boolean[] column3IsNull = new boolean[] {false, false, true, true, false};
+ boolean[] column4Array = new boolean[] {true, false, false, false, false};
+ boolean[] column4IsNull = new boolean[] {false, true, false, false, false};
+
+ try {
+ int count = 0;
+ ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
+ listenableFuture.get();
+ while (!innerTimeJoinOperator.isFinished() &&
innerTimeJoinOperator.hasNext()) {
+ TsBlock tsBlock = innerTimeJoinOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(column1Array[count],
tsBlock.getColumn(0).getInt(i));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(column2Array[count],
tsBlock.getColumn(1).getLong(i));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getFloat(i), 0.000001);
+ }
+ assertEquals(column4IsNull[count], tsBlock.getColumn(3).isNull(i));
+ if (!column4IsNull[count]) {
+ assertEquals(column4Array[count],
tsBlock.getColumn(3).getBoolean(i));
+ }
+ }
+ }
+ listenableFuture = innerTimeJoinOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInnerJoin3() {
+ // child-1
+ // Time, s1, s2
+ // 100 100 200
+ // 90 90 180
+ // 80 80 160
+ // 70 70 140
+ // 60 60 120
+ // 50 50 100
+ // 40 40 80
+ // 30 30 60
+ // 20 20 40
+ // 10 10 20
+ // 0 0 0
+ // ---------------------- TsBlock-1
+
+ // child-2
+ // Time, s3, s4
+ // 1000 3000.0 false
+ // 500 500.0 true
+ // 100 300.0 null
+ // ------------------------- TsBlock-1
+ // 99 99.0 true
+ // 95 95.0 null
+ // 90 null false
+ // ------------------------- TsBlock-2
+ // 50 150.0 true
+ // 48 48.0 true
+ // 20 60.0 null
+ // 10 null false
+ // ------------------------- TsBlock-3
+
+ // result table
+ // Time, s1, s2, s3, s4
+ // 100 100 200 300.0 null
+ // 90 90 180 null false
+ // 50 50 100 150.0 true
+ // 20 20 40 60.0 null
+ // 10 10 20 null false
+
OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
- Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
+ Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new
Duration(1000, TimeUnit.SECONDS));
+
+ Operator child1 =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {{100L, 90L, 80L, 70L, 60L, 50L, 40L, 30L, 20L,
10L, 0L}};
+
+ private final int[][] value1Array =
+ new int[][] {{100, 90, 80, 70, 60, 50, 40, 30, 20, 10, 0}};
+
+ private final long[][] value2Array =
+ new long[][] {{200L, 180L, 160, 140, 120, 100L, 80L, 60L, 40L,
20L, 0L}};
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {{false, false, false, false, false, false, false, false,
false, false, false}},
+ {{false, false, false, false, false, false, false, false,
false, false, false}}
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length, Arrays.asList(TSDataType.INT32,
TSDataType.INT64));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
+ }
+ if (valueIsNull[1][index][i]) {
+ builder.getColumnBuilder(1).appendNull();
+ } else {
+ builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 1;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 1;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ Operator child2 =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {{1000L, 500L, 100L}, {99L, 95L, 90L}, {50L, 48L,
20L, 10L}};
+
+ private final float[][] value1Array =
+ new float[][] {
+ {3000.0f, 500.0f, 300.0f},
+ {99.0f, 95.0f, 0.0f},
+ {150.0f, 48.0f, 60.0f, 0.0f}
+ };
+
+ private final boolean[][] value2Array =
+ new boolean[][] {
+ {false, true, false},
+ {true, false, false},
+ {true, true, false, false}
+ };
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {
+ {false, false, false},
+ {false, false, true},
+ {false, false, false, true}
+ },
+ {
+ {false, false, true},
+ {false, true, false},
+ {false, false, true, false}
+ }
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length, Arrays.asList(TSDataType.FLOAT,
TSDataType.BOOLEAN));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
+ }
+ if (valueIsNull[1][index][i]) {
+ builder.getColumnBuilder(1).appendNull();
+ } else {
+
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
- LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
- new LeftOuterTimeJoinOperator(
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ InnerTimeJoinOperator innerTimeJoinOperator =
+ new InnerTimeJoinOperator(
operatorContext,
- leftChild,
- 2,
- rightChild,
+ Arrays.asList(child1, child2),
Arrays.asList(TSDataType.INT32, TSDataType.INT64,
TSDataType.FLOAT, TSDataType.BOOLEAN),
new DescTimeComparator());
- long[] timeArray = new long[] {25L, 22L, 19L, 18L, 15L, 9L, 7L, 6L, 3L};
- int[] column1Array = new int[] {0, 22, 19, 18, 0, 0, 7, 0, 3};
- boolean[] column1IsNull =
- new boolean[] {true, false, false, false, true, true, false, true,
false};
- long[] column2Array = new long[] {26L, 0L, 20L, 0L, 16L, 0L, 0L, 7L, 4L};
- boolean[] column2IsNull =
- new boolean[] {false, true, false, true, false, true, true, false,
false};
- float[] column3Array = new float[] {0.0f, 0.0f, 190.0f, 180.0f, 0.0f,
0.0f, 0.0f, 0.0f, 30.0f};
- boolean[] column3IsNull =
- new boolean[] {true, true, false, false, true, true, true, true,
false};
- boolean[] column4Array =
- new boolean[] {false, false, true, false, false, false, false, false,
false};
- boolean[] column4IsNull =
- new boolean[] {true, true, false, true, false, true, false, true,
false};
+ long[] timeArray = new long[] {100L, 90L, 50L, 20L, 10L};
+ int[] column1Array = new int[] {100, 90, 50, 20, 10};
+ boolean[] column1IsNull = new boolean[] {false, false, false, false,
false};
+ long[] column2Array = new long[] {200L, 180L, 100L, 40L, 20L};
+ boolean[] column2IsNull = new boolean[] {false, false, false, false,
false};
+ float[] column3Array = new float[] {300.0f, 0.0f, 150.0f, 60.0f, 0.0f};
+ boolean[] column3IsNull = new boolean[] {false, true, false, false, true};
+ boolean[] column4Array = new boolean[] {false, false, true, false, false};
+ boolean[] column4IsNull = new boolean[] {true, false, false, true, false};
try {
int count = 0;
- while (leftOuterTimeJoinOperator.hasNext()) {
- TsBlock tsBlock = leftOuterTimeJoinOperator.next();
+ ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
+ listenableFuture.get();
+ while (!innerTimeJoinOperator.isFinished() &&
innerTimeJoinOperator.hasNext()) {
+ TsBlock tsBlock = innerTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
@@ -612,6 +877,8 @@ public class InnerTimeJoinOperatorTest {
}
}
}
+ listenableFuture = innerTimeJoinOperator.isBlocked();
+ listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
@@ -619,4 +886,360 @@ public class InnerTimeJoinOperatorTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testInnerJoin4() {
+ // child-1
+ // Time, s1, s2
+ // 100 100 200
+ // 90 90 180
+ // 80 80 160
+ // 70 70 140
+ // 60 60 120
+ // 50 50 100
+ // 40 40 80
+ // 30 30 60
+ // 20 20 40
+ // 10 10 20
+ // 0 0 0
+ // ---------------------- TsBlock-1
+
+ // child-2
+ // Time, s3, s4
+ // 1000 3000.0 false
+ // 500 500.0 true
+ // 100 300.0 null
+ // ------------------------- TsBlock-1
+ // 99 99.0 true
+ // 95 95.0 null
+ // 90 null false
+ // ------------------------- TsBlock-2
+ // 50 150.0 true
+ // 48 48.0 true
+ // 20 60.0 null
+ // 10 null false
+ // ------------------------- TsBlock-3
+
+ // child-3
+ // Time, s5,
+ // 1000 "iotdb"
+ // 500 "ty"
+ // 101 "zm"
+ // --------------------- TsBlock-1
+ // 99 "ty"
+ // 95 "love"
+ // 80 "zm"
+ // 60 "2018-05-06"
+ // --------------------- TsBlock-2
+ // 40 "1997-09-09"
+ // 22 "1995-04-21"
+ // 11 "2022-04-21"
+ // 0 "2023-12-30"
+ // --------------------- TsBlock-3
+
+ // result table
+ // Time, s1, s2, s3, s4
+ // empty
+
+ OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+ Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new
Duration(1000, TimeUnit.SECONDS));
+
+ Operator child1 =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {{100L, 90L, 80L, 70L, 60L, 50L, 40L, 30L, 20L,
10L, 0L}};
+
+ private final int[][] value1Array =
+ new int[][] {{100, 90, 80, 70, 60, 50, 40, 30, 20, 10, 0}};
+
+ private final long[][] value2Array =
+ new long[][] {{200L, 180L, 160, 140, 120, 100L, 80L, 60L, 40L,
20L, 0L}};
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {{false, false, false, false, false, false, false, false,
false, false, false}},
+ {{false, false, false, false, false, false, false, false,
false, false, false}}
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length, Arrays.asList(TSDataType.INT32,
TSDataType.INT64));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
+ }
+ if (valueIsNull[1][index][i]) {
+ builder.getColumnBuilder(1).appendNull();
+ } else {
+ builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 1;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 1;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ Operator child2 =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {{1000L, 500L, 100L}, {99L, 95L, 90L}, {50L, 48L,
20L, 10L}};
+
+ private final float[][] value1Array =
+ new float[][] {
+ {3000.0f, 500.0f, 300.0f},
+ {99.0f, 95.0f, 0.0f},
+ {150.0f, 48.0f, 60.0f, 0.0f}
+ };
+
+ private final boolean[][] value2Array =
+ new boolean[][] {
+ {false, true, false},
+ {true, false, false},
+ {true, true, false, false}
+ };
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {
+ {false, false, false},
+ {false, false, true},
+ {false, false, false, true}
+ },
+ {
+ {false, false, true},
+ {false, true, false},
+ {false, false, true, false}
+ }
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length, Arrays.asList(TSDataType.FLOAT,
TSDataType.BOOLEAN));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
+ }
+ if (valueIsNull[1][index][i]) {
+ builder.getColumnBuilder(1).appendNull();
+ } else {
+
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ Operator child3 =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {{1000L, 500L, 101L}, {99L, 95L, 80L, 60L}, {40L,
22L, 11L, 0L}};
+
+ private final Binary[][] value1Array =
+ new Binary[][] {
+ {
+ new Binary("iotdb".getBytes(StandardCharsets.UTF_8)),
+ new Binary("ty".getBytes(StandardCharsets.UTF_8)),
+ new Binary("zm".getBytes(StandardCharsets.UTF_8))
+ },
+ {
+ new Binary("ty".getBytes(StandardCharsets.UTF_8)),
+ new Binary("love".getBytes(StandardCharsets.UTF_8)),
+ new Binary("zm".getBytes(StandardCharsets.UTF_8)),
+ new Binary("2018-05-06".getBytes(StandardCharsets.UTF_8))
+ },
+ {
+ new Binary("1997-09-09".getBytes(StandardCharsets.UTF_8)),
+ new Binary("1995-04-21".getBytes(StandardCharsets.UTF_8)),
+ new Binary("2022-04-21".getBytes(StandardCharsets.UTF_8)),
+ new Binary("2023-12-30".getBytes(StandardCharsets.UTF_8))
+ }
+ };
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {
+ {false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ }
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(timeArray[index].length,
Arrays.asList(TSDataType.TEXT));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeBinary(value1Array[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ InnerTimeJoinOperator innerTimeJoinOperator =
+ new InnerTimeJoinOperator(
+ operatorContext,
+ Arrays.asList(child1, child2, child3),
+ Arrays.asList(
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT),
+ new DescTimeComparator());
+
+ try {
+ int count = 0;
+ ListenableFuture<?> listenableFuture = innerTimeJoinOperator.isBlocked();
+ listenableFuture.get();
+ while (!innerTimeJoinOperator.isFinished() &&
innerTimeJoinOperator.hasNext()) {
+ TsBlock tsBlock = innerTimeJoinOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ count += tsBlock.getPositionCount();
+ }
+ listenableFuture = innerTimeJoinOperator.isBlocked();
+ listenableFuture.get();
+ }
+ assertEquals(0, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
index a3488806834..bf50a037829 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.junit.Test;
import org.mockito.Mockito;
@@ -83,6 +84,9 @@ public class LeftOuterTimeJoinOperatorTest {
// 22 22, null
// 25 25, null
+ OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+ Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
+
Operator leftChild =
new Operator() {
private final long[][] timeArray =
@@ -112,7 +116,7 @@ public class LeftOuterTimeJoinOperatorTest {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
@@ -194,7 +198,7 @@ public class LeftOuterTimeJoinOperatorTest {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
@@ -244,9 +248,6 @@ public class LeftOuterTimeJoinOperatorTest {
}
};
- OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
- Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
-
LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
new LeftOuterTimeJoinOperator(
operatorContext,
@@ -272,7 +273,9 @@ public class LeftOuterTimeJoinOperatorTest {
try {
int count = 0;
- while (leftOuterTimeJoinOperator.hasNext()) {
+ ListenableFuture<?> listenableFuture =
leftOuterTimeJoinOperator.isBlocked();
+ listenableFuture.get();
+ while (!leftOuterTimeJoinOperator.isFinished() &&
leftOuterTimeJoinOperator.hasNext()) {
TsBlock tsBlock = leftOuterTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
@@ -287,6 +290,8 @@ public class LeftOuterTimeJoinOperatorTest {
}
}
}
+ listenableFuture = leftOuterTimeJoinOperator.isBlocked();
+ listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {
@@ -358,6 +363,9 @@ public class LeftOuterTimeJoinOperatorTest {
// 6 null 7 null null
// 3 3 4 30.0 false
+ OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+ Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
+
Operator leftChild =
new Operator() {
private final long[][] timeArray =
@@ -379,7 +387,7 @@ public class LeftOuterTimeJoinOperatorTest {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
@@ -501,7 +509,7 @@ public class LeftOuterTimeJoinOperatorTest {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
@@ -560,9 +568,6 @@ public class LeftOuterTimeJoinOperatorTest {
}
};
- OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
- Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
-
LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
new LeftOuterTimeJoinOperator(
operatorContext,
@@ -589,7 +594,9 @@ public class LeftOuterTimeJoinOperatorTest {
try {
int count = 0;
- while (leftOuterTimeJoinOperator.hasNext()) {
+ ListenableFuture<?> listenableFuture =
leftOuterTimeJoinOperator.isBlocked();
+ listenableFuture.get();
+ while (!leftOuterTimeJoinOperator.isFinished() &&
leftOuterTimeJoinOperator.hasNext()) {
TsBlock tsBlock = leftOuterTimeJoinOperator.next();
if (tsBlock != null && !tsBlock.isEmpty()) {
for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
@@ -612,6 +619,8 @@ public class LeftOuterTimeJoinOperatorTest {
}
}
}
+ listenableFuture = leftOuterTimeJoinOperator.isBlocked();
+ listenableFuture.get();
}
assertEquals(timeArray.length, count);
} catch (Exception e) {