This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TimeJoin
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TimeJoin by this push:
new 1fbc5251ff7 add UT for LeftOuterTimeJoinOperator
1fbc5251ff7 is described below
commit 1fbc5251ff735ec0fa8b3c2cd5c75f90a6abfd73
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Dec 20 12:52:48 2023 +0800
add UT for LeftOuterTimeJoinOperator
---
.../process/join/LeftOuterTimeJoinOperator.java | 21 +-
.../join/LeftOuterTimeJoinOperatorTest.java | 622 +++++++++++++++++++++
2 files changed, 639 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
index 651c47d3c2c..b997484a1e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
@@ -138,6 +138,7 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
&& !resultBuilder.isFull()
&& appendRightTableRow(time)) {
timeColumnBuilder.writeLong(time);
+ resultBuilder.declarePosition();
// deal with leftTsBlock
appendLeftTableRow();
@@ -184,7 +185,13 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
private void appendLeftTableRow() {
for (int i = 0; i < leftColumnCount; i++) {
- resultBuilder.getColumnBuilder(i).write(leftTsBlock.getColumn(i),
leftIndex);
+ Column leftColumn = leftTsBlock.getColumn(i);
+ ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ if (leftColumn.isNull(leftIndex)) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.write(leftColumn, leftIndex);
+ }
}
leftIndex++;
}
@@ -215,9 +222,13 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
if (rightTsBlock.getTimeByIndex(rightIndex) == time) {
// right table has this time, append right table's corresponding row
for (int i = leftColumnCount; i < outputColumnCount; i++) {
- resultBuilder
- .getColumnBuilder(i)
- .write(rightTsBlock.getColumn(i - leftColumnCount), rightIndex);
+ Column rightColumn = rightTsBlock.getColumn(i - leftColumnCount);
+ ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ if (rightColumn.isNull(rightIndex)) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.write(rightColumn, rightIndex);
+ }
}
// update right Index
rightIndex++;
@@ -239,6 +250,8 @@ public class LeftOuterTimeJoinOperator implements
ProcessOperator {
timeColumnBuilder.writeLong(leftTimeColumn.getLong(i));
}
+ resultBuilder.declarePositions(rowSize - leftIndex);
+
// append value column of left table
appendValueColumnForLeftTable(rowSize);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
new file mode 100644
index 00000000000..a3488806834
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperatorTest.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.join;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import io.airlift.units.Duration;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class LeftOuterTimeJoinOperatorTest {
+
+ @Test
+ public void testLeftOuterJoin1() {
+ // left table
+ // Time, s1
+ // 4 4
+ // 6 6
+ // 9 9
+ // ----------- TsBlock-1
+ // 13 13
+ // 17 17
+ // ----------- TsBlock-2
+ // 22 22
+ // 25 25
+ // ----------- TsBlock-3
+
+ // right table
+ // Time, s2
+ // 1 10
+ // 2 20
+ // 3 30
+ // ----------- TsBlock-1
+ // 4 40
+ // 5 50
+ // 10 100
+ // ----------- TsBlock-2
+ // 13 130
+ // 16 160
+ // ----------- TsBlock-3
+ // 26 260
+ // 27 270
+ // ----------- TsBlock-4
+
+ // result table
+ // Time, s1, s2
+ // 4, 4, 40
+ // 6, 6, null
+ // 9 9, null
+ // 13 13, 130
+ // 17 17, null
+ // 22 22, null
+ // 25 25, null
+
+ Operator leftChild =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {
+ {4L, 6L, 9L},
+ {13L, 17L},
+ {22L, 25L}
+ };
+
+ private final int[][] valueArray =
+ new int[][] {
+ {4, 6, 9},
+ {13, 17},
+ {22, 25}
+ };
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {
+ {false, false, false},
+ {false, false},
+ {false, false}
+ }
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length,
Collections.singletonList(TSDataType.INT32));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeInt(valueArray[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ Operator rightChild =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {
+ {1L, 2L, 3L},
+ {4L, 5L, 10L},
+ {13L, 16L},
+ {26L, 27L}
+ };
+
+ private final long[][] valueArray =
+ new long[][] {
+ {10L, 20L, 30L},
+ {40L, 50L, 100L},
+ {130L, 160L},
+ {260L, 270L}
+ };
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {
+ {false, false, false},
+ {false, false, false},
+ {false, false},
+ {false, false}
+ }
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length,
Collections.singletonList(TSDataType.INT64));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeLong(valueArray[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 4;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 4;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+ Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
+
+ LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
+ new LeftOuterTimeJoinOperator(
+ operatorContext,
+ leftChild,
+ 1,
+ rightChild,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT64),
+ new AscTimeComparator());
+
+ assertEquals(
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes()
+ 64 * 1024 * 2,
+ leftOuterTimeJoinOperator.calculateMaxPeekMemory());
+ assertEquals(
+ TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
+ leftOuterTimeJoinOperator.calculateMaxReturnSize());
+ assertEquals(64 * 1024 * 2,
leftOuterTimeJoinOperator.calculateRetainedSizeAfterCallingNext());
+
+ long[] timeArray = new long[] {4L, 6L, 9L, 13L, 17L, 22L, 25L};
+ int[] column1Array = new int[] {4, 6, 9, 13, 17, 22, 25};
+ boolean[] column1IsNull = new boolean[] {false, false, false, false,
false, false, false};
+ long[] column2Array = new long[] {40L, 0L, 0L, 130L, 0L, 0L, 0L};
+ boolean[] column2IsNull = new boolean[] {false, true, true, false, true,
true, true};
+
+ try {
+ int count = 0;
+ while (leftOuterTimeJoinOperator.hasNext()) {
+ TsBlock tsBlock = leftOuterTimeJoinOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(column1Array[count],
tsBlock.getColumn(0).getInt(i));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(column2Array[count],
tsBlock.getColumn(1).getLong(i));
+ }
+ }
+ }
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testLeftOuterJoin2() {
+ // left table
+ // Time, s1, s2
+ // 25 null 26
+ // 22 22 null
+ // ---------------------- TsBlock-1
+ // null
+ // ---------------------- TsBlock-2
+ // 19 19 20
+ // 18 18 null
+ // 15 null 16
+ // ---------------------- TsBlock-3
+ // empty
+ // ---------------------- TsBlock-4
+ // 9 null null
+ // 7 7 null
+ // 6 null 7
+ // 3 3 4
+ // ---------------------- TsBlock-5
+ // empty
+ // ---------------------- TsBlock-6
+
+ // right table
+ // Time, s3, s4
+ // 21 210.0 false
+ // 20 200.0 null
+ // ---------------------- TsBlock-1
+ // empty
+ // ---------------------- TsBlock-2
+ // 19 190.0 true
+ // 18 180.0 null
+ // 15 null false
+ // 14 null null
+ // 8 80.0 true
+ // 7 null false
+ // ---------------------- TsBlock-3
+ // null
+ // ---------------------- TsBlock-4
+ // 5 50.0 true
+ // ---------------------- TsBlock-5
+ // 4 40.0 null
+ // ---------------------- TsBlock-6
+ // 3 30.0 false
+ // ---------------------- TsBlock-7
+ // 2 20.0 true
+ // 1 10.0 false
+ // ---------------------- TsBlock-8
+ // empty
+ // ---------------------- TsBlock-9
+
+ // result table
+ // Time, s1, s2, s3, s4
+ // 25 null 26 null null
+ // 22 22 null null null
+ // 19 19 20 190.0 true
+ // 18 18 null 180.0 null
+ // 15 null 16 null false
+ // 9 null null null null
+ // 7 7 null null false
+ // 6 null 7 null null
+ // 3 3 4 30.0 false
+
+ Operator leftChild =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {{25L, 22L}, null, {19L, 18L, 15L}, {}, {9L, 7L,
6L, 3L}, {}};
+
+ private final int[][] value1Array =
+ new int[][] {{0, 22}, null, {19, 18, 0}, {}, {0, 7, 0, 3}, {}};
+
+ private final long[][] value2Array =
+ new long[][] {{26L, 0L}, null, {20L, 0L, 16L}, {}, {0L, 0L, 7L,
4L}, {}};
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {{true, false}, null, {false, false, true}, {}, {true, false,
true, false}, {}},
+ {{false, true}, null, {false, true, false}, {}, {true, true,
false, false}, {}}
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length, Arrays.asList(TSDataType.INT32,
TSDataType.INT64));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeInt(value1Array[index][i]);
+ }
+ if (valueIsNull[1][index][i]) {
+ builder.getColumnBuilder(1).appendNull();
+ } else {
+ builder.getColumnBuilder(1).writeLong(value2Array[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 6;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 6;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ Operator rightChild =
+ new Operator() {
+ private final long[][] timeArray =
+ new long[][] {
+ {21L, 20L}, {}, {19L, 18L, 15L, 14L, 8L, 7L}, null, {5L},
{4L}, {3L}, {2L, 1L}, {}
+ };
+
+ private final float[][] value1Array =
+ new float[][] {
+ {210.0f, 200.0f},
+ {},
+ {190.0f, 180.0f, 0.0f, 0.0f, 80.0f, 0.0f},
+ null,
+ {50.0f},
+ {40.0f},
+ {30.0f},
+ {20.0f, 10.0f},
+ {}
+ };
+
+ private final boolean[][] value2Array =
+ new boolean[][] {
+ {false, false},
+ {},
+ {true, false, false, false, true, false},
+ null,
+ {true},
+ {false},
+ {false},
+ {true, false},
+ {}
+ };
+
+ private final boolean[][][] valueIsNull =
+ new boolean[][][] {
+ {
+ {false, false},
+ {},
+ {false, false, true, true, false, true},
+ null,
+ {false},
+ {false},
+ {false},
+ {false, false},
+ {}
+ },
+ {
+ {false, true},
+ {},
+ {false, true, false, true, false, false},
+ null,
+ {false},
+ {true},
+ {false},
+ {false, false},
+ {}
+ }
+ };
+
+ private int index = 0;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ if (timeArray[index] == null) {
+ index++;
+ return null;
+ }
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ timeArray[index].length, Arrays.asList(TSDataType.FLOAT,
TSDataType.BOOLEAN));
+ for (int i = 0, size = timeArray[index].length; i < size; i++) {
+ builder.getTimeColumnBuilder().writeLong(timeArray[index][i]);
+ if (valueIsNull[0][index][i]) {
+ builder.getColumnBuilder(0).appendNull();
+ } else {
+ builder.getColumnBuilder(0).writeFloat(value1Array[index][i]);
+ }
+ if (valueIsNull[1][index][i]) {
+ builder.getColumnBuilder(1).appendNull();
+ } else {
+
builder.getColumnBuilder(1).writeBoolean(value2Array[index][i]);
+ }
+ }
+ builder.declarePositions(timeArray[index].length);
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 9;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return index >= 9;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 64 * 1024;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+ };
+
+ OperatorContext operatorContext = Mockito.mock(OperatorContext.class);
+ Mockito.when(operatorContext.getMaxRunTime()).thenReturn(new Duration(1,
TimeUnit.SECONDS));
+
+ LeftOuterTimeJoinOperator leftOuterTimeJoinOperator =
+ new LeftOuterTimeJoinOperator(
+ operatorContext,
+ leftChild,
+ 2,
+ rightChild,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT64,
TSDataType.FLOAT, TSDataType.BOOLEAN),
+ new DescTimeComparator());
+
+ long[] timeArray = new long[] {25L, 22L, 19L, 18L, 15L, 9L, 7L, 6L, 3L};
+ int[] column1Array = new int[] {0, 22, 19, 18, 0, 0, 7, 0, 3};
+ boolean[] column1IsNull =
+ new boolean[] {true, false, false, false, true, true, false, true,
false};
+ long[] column2Array = new long[] {26L, 0L, 20L, 0L, 16L, 0L, 0L, 7L, 4L};
+ boolean[] column2IsNull =
+ new boolean[] {false, true, false, true, false, true, true, false,
false};
+ float[] column3Array = new float[] {0.0f, 0.0f, 190.0f, 180.0f, 0.0f,
0.0f, 0.0f, 0.0f, 30.0f};
+ boolean[] column3IsNull =
+ new boolean[] {true, true, false, false, true, true, true, true,
false};
+ boolean[] column4Array =
+ new boolean[] {false, false, true, false, false, false, false, false,
false};
+ boolean[] column4IsNull =
+ new boolean[] {true, true, false, true, false, true, false, true,
false};
+
+ try {
+ int count = 0;
+ while (leftOuterTimeJoinOperator.hasNext()) {
+ TsBlock tsBlock = leftOuterTimeJoinOperator.next();
+ if (tsBlock != null && !tsBlock.isEmpty()) {
+ for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++,
count++) {
+ assertEquals(timeArray[count], tsBlock.getTimeByIndex(i));
+ assertEquals(column1IsNull[count], tsBlock.getColumn(0).isNull(i));
+ if (!column1IsNull[count]) {
+ assertEquals(column1Array[count],
tsBlock.getColumn(0).getInt(i));
+ }
+ assertEquals(column2IsNull[count], tsBlock.getColumn(1).isNull(i));
+ if (!column2IsNull[count]) {
+ assertEquals(column2Array[count],
tsBlock.getColumn(1).getLong(i));
+ }
+ assertEquals(column3IsNull[count], tsBlock.getColumn(2).isNull(i));
+ if (!column3IsNull[count]) {
+ assertEquals(column3Array[count],
tsBlock.getColumn(2).getFloat(i), 0.000001);
+ }
+ assertEquals(column4IsNull[count], tsBlock.getColumn(3).isNull(i));
+ if (!column4IsNull[count]) {
+ assertEquals(column4Array[count],
tsBlock.getColumn(3).getBoolean(i));
+ }
+ }
+ }
+ }
+ assertEquals(timeArray.length, count);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}