This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/fix_some_join_cases in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 263e58289e554efb17478e4ddcc8e6193f8bca83 Author: Beyyes <[email protected]> AuthorDate: Tue Nov 26 21:52:10 2024 +0800 perfect full join --- .../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 75 ++++- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../relational/TableFullOuterJoinOperator.java | 354 ++++++++------------- .../source/relational/TableInnerJoinOperator.java | 142 ++++++--- 4 files changed, 286 insertions(+), 287 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index e0a7dad4dc5..ce50abf72ac 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -146,6 +146,8 @@ public class IoTDBMultiIDsWithAttributesTableIT { "insert into tableB(time,device,value) values('2020-01-01 00:00:02.000', 'd1', 20)", "insert into tableB(time,device,value) values('2020-01-01 00:00:03.000', 'd1', 30)", "flush", + "insert into tableB(time,device,value) values('2020-01-01 00:00:03.000', 'd333', 333)", + "flush", "insert into tableB(time,device,value) values('2020-01-01 00:00:04.000', 'd2', 40)", "insert into tableB(time,device,value) values('2020-01-01 00:00:05.000', 'd2', 50)" }; @@ -155,9 +157,9 @@ public class IoTDBMultiIDsWithAttributesTableIT { static String sql; // public static void main(String[] args) { - // for (String[] sqlList : Arrays.asList(sql4, sql5)) { + // for (String[] sqlList : Arrays.asList(sql1, sql2)) { // for (String sql : sqlList) { - // System.out.println(sql); + // System.out.println(sql+";"); // } // } // } @@ -1516,9 +1518,9 @@ public class IoTDBMultiIDsWithAttributesTableIT { // no filter @Test public void fullOuterJoinTest1() { - String[] expectedHeader = + expectedHeader = new String[] {"time", "device", "level", "num", "device", "attr2", "num", "str"}; - String[] retArray = + retArray = new String[] { "1970-01-01T00:00:00.000Z,d1,l1,3,d1,d,3,coconut,", "1970-01-01T00:00:00.000Z,d1,l1,3,d2,c,3,coconut,", @@ -1607,6 +1609,46 @@ public class IoTDBMultiIDsWithAttributesTableIT { + "FROM table0 t1 INNER JOIN table0 t2 USING(time)\n" + "ORDER BY time, t1.device, t2.device"; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + sql = + "select t1.time as time1, t2.time as time2, " + + " t1.device as device1, " + + " t1.value as value1, " + + " t2.device as device2, " + + " t2.value as value2 from tableA t1 full join tableB t2 on t1.time = t2.time " + + "order by COALESCE(time1, time2),device1,device2"; + retArray = + new String[] { + "2020-01-01T00:00:01.000Z,null,d1,1,null,null,", + "null,2020-01-01T00:00:02.000Z,null,null,d1,20,", + "2020-01-01T00:00:03.000Z,2020-01-01T00:00:03.000Z,d1,3,d1,30,", + "2020-01-01T00:00:03.000Z,2020-01-01T00:00:03.000Z,d1,3,d333,333,", + "null,2020-01-01T00:00:04.000Z,null,null,d2,40,", + "2020-01-01T00:00:05.000Z,2020-01-01T00:00:05.000Z,d2,5,d2,50,", + "2020-01-01T00:00:07.000Z,null,d2,7,null,null,", + }; + expectedHeader = new String[] {"time1", "time2", "device1", "value1", "device2", "value2"}; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + + sql = + "select time, " + + " t1.device as device1, " + + " t1.value as value1, " + + " t2.device as device2, " + + " t2.value as value2 from tableA t1 full join tableB t2 USING(time) " + + "order by time,device1,device2"; + retArray = + new String[] { + "2020-01-01T00:00:01.000Z,d1,1,null,null,", + "2020-01-01T00:00:02.000Z,null,null,d1,20,", + "2020-01-01T00:00:03.000Z,d1,3,d1,30,", + "2020-01-01T00:00:03.000Z,d1,3,d333,333,", + "2020-01-01T00:00:04.000Z,null,null,d2,40,", + "2020-01-01T00:00:05.000Z,d2,5,d2,50,", + "2020-01-01T00:00:07.000Z,d2,7,null,null,", + }; + expectedHeader = new String[] {"time", "device1", "value1", "device2", "value2"}; + tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } // has filter @@ -1626,6 +1668,8 @@ public class IoTDBMultiIDsWithAttributesTableIT { "1970-01-01T00:00:00.080Z,d1,l4,10,d2,null,9,apple,", "1970-01-01T00:00:00.080Z,d2,l4,10,d1,null,9,apple,", "1970-01-01T00:00:00.080Z,d2,l4,10,d2,null,9,apple,", + "1970-01-01T00:00:00.100Z,d1,l5,9,null,null,null,null,", + "1970-01-01T00:00:00.100Z,d2,l5,9,null,null,null,null,", "1971-01-01T00:00:00.100Z,d1,l2,11,d1,zz,10,pumelo,", "1971-01-01T00:00:00.100Z,d1,l2,11,d2,null,10,pumelo,", "1971-01-01T00:00:00.100Z,d2,l2,11,d1,zz,10,pumelo,", @@ -1660,13 +1704,16 @@ public class IoTDBMultiIDsWithAttributesTableIT { tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); // join on - sql = - "SELECT COALESCE(t1.time, t2.time) as time, t1.device, t1.level, t1_num_add, t2.device, t2.attr2, t2.num, t2.str\n" - + "FROM (SELECT *,num+1 as t1_num_add FROM table0 WHERE TIME>=80 AND level!='l1' AND cast(num as double)>0) t1 \n" - + "FULL JOIN (SELECT * FROM table0 WHERE TIME<=31536001000 AND floatNum<1000 AND device in ('d1','d2')) t2 \n" - + "ON t1.time = t2.time \n" - + "ORDER BY time, t1.device, t2.device"; - tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + // sql = + // "SELECT COALESCE(t1.time, t2.time) as time, t1.device, t1.level, t1_num_add, + // t2.device, t2.attr2, t2.num, t2.str\n" + // + "FROM (SELECT *,num+1 as t1_num_add FROM table0 WHERE TIME>=80 AND level!='l1' + // AND cast(num as double)>0) t1 \n" + // + "FULL JOIN (SELECT * FROM table0 WHERE TIME<=31536001000 AND floatNum<1000 AND + // device in ('d1','d2')) t2 \n" + // + "ON t1.time = t2.time \n" + // + "ORDER BY time, t1.device, t2.device"; + // tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } @Test @@ -1719,10 +1766,12 @@ public class IoTDBMultiIDsWithAttributesTableIT { + " t2.value as value2 " + "FROM " + " tableA t1 JOIN tableB t2 " - + "ON t1.time = t2.time"; + + "ON t1.time = t2.time order by t1.time, device1, device2"; retArray = new String[] { - "2020-01-01T00:00:03.000Z,d1,3,d1,30,", "2020-01-01T00:00:05.000Z,d2,5,d2,50,", + "2020-01-01T00:00:03.000Z,d1,3,d1,30,", + "2020-01-01T00:00:03.000Z,d1,3,d333,333,", + "2020-01-01T00:00:05.000Z,d2,5,d2,50,", }; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 942fe993c56..49395f276a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -635,7 +635,7 @@ public class IoTDBConfig { private long cacheFileReaderClearPeriod = 100000; /** the max executing time of query in ms. Unit: millisecond */ - private long queryTimeoutThreshold = 60000; + private long queryTimeoutThreshold = 60000000; /** the max time to live of a session in ms. Unit: millisecond */ private int sessionTimeoutThreshold = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableFullOuterJoinOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableFullOuterJoinOperator.java index e5b669f8300..3e73346ada0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableFullOuterJoinOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableFullOuterJoinOperator.java @@ -20,53 +20,26 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; -import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator; -import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; -import com.google.common.util.concurrent.ListenableFuture; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.utils.RamUsageEstimator; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import static com.google.common.util.concurrent.Futures.successfulAsList; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator.buildResultTsBlock; - -public class TableFullOuterJoinOperator extends AbstractOperator { +public class TableFullOuterJoinOperator extends TableInnerJoinOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableFullOuterJoinOperator.class); - private final Operator leftChild; - private TsBlock leftBlock; - private final int leftTimeColumnPosition; - private int leftIndex; // start index of leftTsBlock - private final int[] leftOutputSymbolIdx; private boolean leftFinished; - - private final Operator rightChild; - private final List<TsBlock> rightBlockList = new ArrayList<>(); - private final int rightTimeColumnPosition; - private int rightBlockListIdx; - private int rightIndex; // start index of rightTsBlock - private final int[] rightOutputSymbolIdx; - private TsBlock cachedNextRightBlock; - private boolean hasCachedNextRightBlock; private boolean rightFinished; private long lastMatchedRightTime = Long.MIN_VALUE; - private final TimeComparator comparator; - private final TsBlockBuilder resultBuilder; - - protected MemoryReservationManager memoryReservationManager; - public TableFullOuterJoinOperator( OperatorContext operatorContext, Operator leftChild, @@ -77,47 +50,16 @@ public class TableFullOuterJoinOperator extends AbstractOperator { int[] rightOutputSymbolIdx, TimeComparator timeComparator, List<TSDataType> dataTypes) { - this.operatorContext = operatorContext; - this.leftChild = leftChild; - this.leftTimeColumnPosition = leftTimeColumnPosition; - this.leftOutputSymbolIdx = leftOutputSymbolIdx; - this.rightChild = rightChild; - this.rightOutputSymbolIdx = rightOutputSymbolIdx; - this.rightTimeColumnPosition = rightTimeColumnPosition; - - this.comparator = timeComparator; - this.resultBuilder = new TsBlockBuilder(dataTypes); - - this.memoryReservationManager = - operatorContext - .getDriverContext() - .getFragmentInstanceContext() - .getMemoryReservationContext(); - } - - @Override - public ListenableFuture<?> isBlocked() { - ListenableFuture<?> leftBlocked = leftChild.isBlocked(); - ListenableFuture<?> rightBlocked = rightChild.isBlocked(); - if (leftBlocked.isDone()) { - return rightBlocked; - } else if (rightBlocked.isDone()) { - return leftBlocked; - } else { - return successfulAsList(leftBlocked, rightBlocked); - } - } - - @Override - public boolean isFinished() throws Exception { - if (retainedTsBlock != null) { - return false; - } - - return !leftBlockNotEmpty() - && leftChild.isFinished() - && !rightBlockNotEmpty() - && rightChild.isFinished(); + super( + operatorContext, + leftChild, + leftTimeColumnPosition, + leftOutputSymbolIdx, + rightChild, + rightTimeColumnPosition, + rightOutputSymbolIdx, + timeComparator, + dataTypes); } @Override @@ -151,18 +93,15 @@ public class TableFullOuterJoinOperator extends AbstractOperator { memoryReservationManager.releaseMemoryCumulatively( rightBlockList.get(i).getRetainedSizeInBytes()); } - rightBlockList.clear(); - rightBlockListIdx = 0; - rightIndex = 0; - resultTsBlock = buildResultTsBlock(resultBuilder); - return checkTsBlockSizeAndGetResult(); + resetRightBlockList(); } else { appendLeftWithEmptyRight(); leftBlock = null; leftIndex = 0; - resultTsBlock = buildResultTsBlock(resultBuilder); - return checkTsBlockSizeAndGetResult(); } + + resultTsBlock = buildResultTsBlock(resultBuilder); + return checkTsBlockSizeAndGetResult(); } // all the rightTsBlock is less than leftTsBlock, append right with empty left @@ -172,9 +111,7 @@ public class TableFullOuterJoinOperator extends AbstractOperator { memoryReservationManager.releaseMemoryCumulatively( rightBlockList.get(i).getRetainedSizeInBytes()); } - rightBlockList.clear(); - rightBlockListIdx = 0; - rightIndex = 0; + resetRightBlockList(); return null; } @@ -196,16 +133,12 @@ public class TableFullOuterJoinOperator extends AbstractOperator { memoryReservationManager.releaseMemoryCumulatively( rightBlockList.get(i).getRetainedSizeInBytes()); } - rightBlockList.clear(); - rightBlockListIdx = 0; - rightIndex = 0; + resetRightBlockList(); break; } appendResult(leftProbeTime); - leftIndex++; - if (leftIndex >= leftBlock.getPositionCount()) { leftBlock = null; leftIndex = 0; @@ -223,7 +156,8 @@ public class TableFullOuterJoinOperator extends AbstractOperator { return checkTsBlockSizeAndGetResult(); } - private boolean prepareInput(long start, long maxRuntime) throws Exception { + @Override + protected boolean prepareInput(long start, long maxRuntime) throws Exception { if (!leftFinished && (leftBlock == null || leftBlock.getPositionCount() == leftIndex)) { if (leftChild.hasNextWithTimer()) { @@ -264,71 +198,30 @@ public class TableFullOuterJoinOperator extends AbstractOperator { || (leftFinished && rightBlockNotEmpty() && hasCachedNextRightBlock); } - private void tryCachedNextRightTsBlock() throws Exception { - if (rightChild.hasNextWithTimer()) { - TsBlock block = rightChild.nextWithTimer(); - if (block != null) { - if (block.getColumn(rightTimeColumnPosition).getLong(0) == getRightEndTime()) { - memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); - rightBlockList.add(block); - } else { - hasCachedNextRightBlock = true; - cachedNextRightBlock = block; - } - } - } else { - hasCachedNextRightBlock = true; - cachedNextRightBlock = null; - } - } - - private long getCurrentLeftTime() { - return leftBlock.getColumn(leftTimeColumnPosition).getLong(leftIndex); - } - - private long getLeftEndTime() { - return leftBlock.getColumn(leftTimeColumnPosition).getLong(leftBlock.getPositionCount() - 1); - } - - private long getCurrentRightTime() { - return rightBlockList - .get(rightBlockListIdx) - .getColumn(rightTimeColumnPosition) - .getLong(rightIndex); - } - - private long getRightTime(int idx1, int idx2) { - return rightBlockList.get(idx1).getColumn(rightTimeColumnPosition).getLong(idx2); - } - - private long getRightEndTime() { - TsBlock lastRightTsBlock = rightBlockList.get(rightBlockList.size() - 1); - return lastRightTsBlock - .getColumn(rightTimeColumnPosition) - .getLong(lastRightTsBlock.getPositionCount() - 1); - } - - private void appendResult(long leftTime) { + @Override + protected void appendResult(long leftTime) { while (comparator.lessThan(getCurrentRightTime(), leftTime)) { + // getCurrentRightTime() can only be greater than lastMatchedRightTime + // if greater than, then put right + // if equals, it has been put in last round + // notice: must examine `comparator.lessThan(getCurrentRightTime(), leftTime)` then examine + // `comparator.lessThan(leftTime, getCurrentRightTime())` if (getCurrentRightTime() > lastMatchedRightTime) { appendOneRightRowWithEmptyLeft(); } - rightIndex++; - - if (rightIndex >= rightBlockList.get(rightBlockListIdx).getPositionCount()) { - rightBlockListIdx++; - rightIndex = 0; - } - - if (rightBlockListIdx >= rightBlockList.size()) { - rightBlockListIdx = 0; - rightIndex = 0; + if (rightBlockFinish()) { return; } } + if (comparator.lessThan(leftTime, getCurrentRightTime())) { + appendOneLeftRowWithEmptyRight(); + leftIndex++; + return; + } + int tmpBlockIdx = rightBlockListIdx, tmpIdx = rightIndex; while (leftTime == getRightTime(tmpBlockIdx, tmpIdx)) { // lastMatchedRightBlockListIdx = rightBlockListIdx; @@ -348,18 +241,12 @@ public class TableFullOuterJoinOperator extends AbstractOperator { break; } } + leftIndex++; } private void appendLeftWithEmptyRight() { while (leftIndex < leftBlock.getPositionCount()) { - for (int i = 0; i < leftOutputSymbolIdx.length; i++) { - ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); - if (leftBlock.getColumn(leftOutputSymbolIdx[i]).isNull(leftIndex)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write(leftBlock.getColumn(leftOutputSymbolIdx[i]), leftIndex); - } - } + appendLeftBlockData(leftOutputSymbolIdx, resultBuilder, leftBlock, leftIndex); for (int i = 0; i < rightOutputSymbolIdx.length; i++) { ColumnBuilder columnBuilder = @@ -381,21 +268,13 @@ public class TableFullOuterJoinOperator extends AbstractOperator { columnBuilder.appendNull(); } - for (int i = 0; i < rightOutputSymbolIdx.length; i++) { - ColumnBuilder columnBuilder = - resultBuilder.getColumnBuilder(leftOutputSymbolIdx.length + i); - - if (rightBlockList - .get(rightBlockListIdx) - .getColumn(rightOutputSymbolIdx[i]) - .isNull(rightIndex)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write( - rightBlockList.get(rightBlockListIdx).getColumn(rightOutputSymbolIdx[i]), - rightIndex); - } - } + appendRightBlockData( + rightBlockList, + rightBlockListIdx, + rightIndex, + leftOutputSymbolIdx, + rightOutputSymbolIdx, + resultBuilder); resultBuilder.declarePosition(); } @@ -414,73 +293,26 @@ public class TableFullOuterJoinOperator extends AbstractOperator { columnBuilder.appendNull(); } - for (int i = 0; i < rightOutputSymbolIdx.length; i++) { - ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(leftOutputSymbolIdx.length + i); - - if (rightBlockList - .get(rightBlockListIdx) - .getColumn(rightOutputSymbolIdx[i]) - .isNull(rightIndex)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write( - rightBlockList.get(rightBlockListIdx).getColumn(rightOutputSymbolIdx[i]), rightIndex); - } - } + appendRightBlockData( + rightBlockList, + rightBlockListIdx, + rightIndex, + leftOutputSymbolIdx, + rightOutputSymbolIdx, + resultBuilder); resultBuilder.declarePosition(); } - private boolean leftBlockNotEmpty() { - return leftBlock != null && leftIndex < leftBlock.getPositionCount(); - } - - private boolean rightBlockNotEmpty() { - return !rightBlockList.isEmpty() - && rightBlockListIdx < rightBlockList.size() - && rightIndex < rightBlockList.get(rightBlockListIdx).getPositionCount(); - } - - private void appendValueToResult(int tmpRightBlockListIdx, int tmpRightIndex) { - for (int i = 0; i < leftOutputSymbolIdx.length; i++) { - ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); - if (leftBlock.getColumn(leftOutputSymbolIdx[i]).isNull(leftIndex)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write(leftBlock.getColumn(leftOutputSymbolIdx[i]), leftIndex); - } - } + private void appendOneLeftRowWithEmptyRight() { + appendLeftBlockData(leftOutputSymbolIdx, resultBuilder, leftBlock, leftIndex); for (int i = 0; i < rightOutputSymbolIdx.length; i++) { ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(leftOutputSymbolIdx.length + i); - - if (rightBlockList - .get(tmpRightBlockListIdx) - .getColumn(rightOutputSymbolIdx[i]) - .isNull(tmpRightIndex)) { - columnBuilder.appendNull(); - } else { - columnBuilder.write( - rightBlockList.get(tmpRightBlockListIdx).getColumn(rightOutputSymbolIdx[i]), - tmpRightIndex); - } - } - } - - @Override - public void close() throws Exception { - if (leftChild != null) { - leftChild.close(); - } - if (rightChild != null) { - rightChild.close(); + columnBuilder.appendNull(); } - if (!rightBlockList.isEmpty()) { - for (TsBlock block : rightBlockList) { - memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); - } - } + resultBuilder.declarePosition(); } @Override @@ -519,3 +351,83 @@ public class TableFullOuterJoinOperator extends AbstractOperator { + resultBuilder.getRetainedSizeInBytes(); } } + +// private final Operator leftChild; +// private TsBlock leftBlock; +// private int leftIndex; // start index of leftTsBlock +// private final int leftTimeColumnPosition; +// private final int[] leftOutputSymbolIdx; +// private final TimeComparator comparator; +// private final TsBlockBuilder resultBuilder; + +// protected MemoryReservationManager memoryReservationManager; + +// private final Operator rightChild; +// private final List<TsBlock> rightBlockList = new ArrayList<>(); +// private final int rightTimeColumnPosition; +// private int rightBlockListIdx; +// private int rightIndex; // start index of rightTsBlock +// private final int[] rightOutputSymbolIdx; +// private TsBlock cachedNextRightBlock; +// private boolean hasCachedNextRightBlock; + +// this.operatorContext = operatorContext; +// this.leftChild = leftChild; +// this.leftTimeColumnPosition = leftTimeColumnPosition; +// this.leftOutputSymbolIdx = leftOutputSymbolIdx; +// this.rightChild = rightChild; +// this.rightOutputSymbolIdx = rightOutputSymbolIdx; +// this.rightTimeColumnPosition = rightTimeColumnPosition; +// +// this.comparator = timeComparator; +// this.resultBuilder = new TsBlockBuilder(dataTypes); +// +// this.memoryReservationManager = +// operatorContext +// .getDriverContext() +// .getFragmentInstanceContext() +// .getMemoryReservationContext(); + +// @Override +// public ListenableFuture<?> isBlocked() { +// ListenableFuture<?> leftBlocked = leftChild.isBlocked(); +// ListenableFuture<?> rightBlocked = rightChild.isBlocked(); +// if (leftBlocked.isDone()) { +// return rightBlocked; +// } else if (rightBlocked.isDone()) { +// return leftBlocked; +// } else { +// return successfulAsList(leftBlocked, rightBlocked); +// } +// } + +// @Override +// public boolean isFinished() throws Exception { +// if (retainedTsBlock != null) { +// return false; +// } +// +// return !leftBlockNotEmpty() +// && leftChild.isFinished() +// && !rightBlockNotEmpty() +// && rightChild.isFinished(); +// } + +// private void appendValueToResult(int tmpRightBlockListIdx, int tmpRightIndex) { +// for (int i = 0; i < leftOutputSymbolIdx.length; i++) { +// ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); +// if (leftBlock.getColumn(leftOutputSymbolIdx[i]).isNull(leftIndex)) { +// columnBuilder.appendNull(); +// } else { +// columnBuilder.write(leftBlock.getColumn(leftOutputSymbolIdx[i]), leftIndex); +// } +// } +// +// TableInnerJoinOperator.appendRightBlockData( +// rightBlockList, +// tmpRightBlockListIdx, +// tmpRightIndex, +// leftOutputSymbolIdx, +// rightOutputSymbolIdx, +// resultBuilder); +// } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java index ac8d18962a2..b4b1d912606 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java @@ -46,23 +46,23 @@ public class TableInnerJoinOperator extends AbstractOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableInnerJoinOperator.class); - private final Operator leftChild; - private TsBlock leftBlock; - private int leftIndex; // start index of leftTsBlock - private final int leftTimeColumnPosition; - private final int[] leftOutputSymbolIdx; - - private final Operator rightChild; - private final List<TsBlock> rightBlockList = new ArrayList<>(); - private final int rightTimeColumnPosition; - private int rightBlockListIdx; - private int rightIndex; // start index of rightTsBlock - private final int[] rightOutputSymbolIdx; - private TsBlock cachedNextRightBlock; - private boolean hasCachedNextRightBlock; - - private final TimeComparator comparator; - private final TsBlockBuilder resultBuilder; + protected final Operator leftChild; + protected TsBlock leftBlock; + protected int leftIndex; // start index of leftTsBlock + protected final int leftTimeColumnPosition; + protected final int[] leftOutputSymbolIdx; + + protected final Operator rightChild; + protected final List<TsBlock> rightBlockList = new ArrayList<>(); + protected final int rightTimeColumnPosition; + protected int rightBlockListIdx; + protected int rightIndex; // start index of rightTsBlock + protected final int[] rightOutputSymbolIdx; + protected TsBlock cachedNextRightBlock; + protected boolean hasCachedNextRightBlock; + + protected final TimeComparator comparator; + protected final TsBlockBuilder resultBuilder; protected MemoryReservationManager memoryReservationManager; @@ -149,9 +149,7 @@ public class TableInnerJoinOperator extends AbstractOperator { memoryReservationManager.releaseMemoryCumulatively( rightBlockList.get(i).getRetainedSizeInBytes()); } - rightBlockList.clear(); - rightBlockListIdx = 0; - rightIndex = 0; + resetRightBlockList(); return null; } @@ -179,8 +177,6 @@ public class TableInnerJoinOperator extends AbstractOperator { appendResult(leftProbeTime); - leftIndex++; - if (leftIndex >= leftBlock.getPositionCount()) { leftBlock = null; leftIndex = 0; @@ -198,7 +194,7 @@ public class TableInnerJoinOperator extends AbstractOperator { return checkTsBlockSizeAndGetResult(); } - private boolean prepareInput(long start, long maxRuntime) throws Exception { + protected boolean prepareInput(long start, long maxRuntime) throws Exception { if ((leftBlock == null || leftBlock.getPositionCount() == leftIndex) && leftChild.hasNextWithTimer()) { leftBlock = leftChild.nextWithTimer(); @@ -230,7 +226,7 @@ public class TableInnerJoinOperator extends AbstractOperator { return leftBlockNotEmpty() && rightBlockNotEmpty() && hasCachedNextRightBlock; } - private void tryCachedNextRightTsBlock() throws Exception { + protected void tryCachedNextRightTsBlock() throws Exception { if (rightChild.hasNextWithTimer()) { TsBlock block = rightChild.nextWithTimer(); if (block != null) { @@ -248,42 +244,33 @@ public class TableInnerJoinOperator extends AbstractOperator { } } - private long getCurrentLeftTime() { + protected long getCurrentLeftTime() { return leftBlock.getColumn(leftTimeColumnPosition).getLong(leftIndex); } - private long getLeftEndTime() { + protected long getLeftEndTime() { return leftBlock.getColumn(leftTimeColumnPosition).getLong(leftBlock.getPositionCount() - 1); } - private long getRightTime(int blockIdx, int rowIdx) { + protected long getRightTime(int blockIdx, int rowIdx) { return rightBlockList.get(blockIdx).getColumn(rightTimeColumnPosition).getLong(rowIdx); } - private long getCurrentRightTime() { + protected long getCurrentRightTime() { return getRightTime(rightBlockListIdx, rightIndex); } - private long getRightEndTime() { + protected long getRightEndTime() { TsBlock lastRightTsBlock = rightBlockList.get(rightBlockList.size() - 1); return lastRightTsBlock .getColumn(rightTimeColumnPosition) .getLong(lastRightTsBlock.getPositionCount() - 1); } - private void appendResult(long leftTime) { + protected void appendResult(long leftTime) { while (comparator.lessThan(getCurrentRightTime(), leftTime)) { - rightIndex++; - - if (rightIndex >= rightBlockList.get(rightBlockListIdx).getPositionCount()) { - rightBlockListIdx++; - rightIndex = 0; - } - - if (rightBlockListIdx >= rightBlockList.size()) { - rightBlockListIdx = 0; - rightIndex = 0; + if (rightBlockFinish()) { return; } } @@ -304,45 +291,96 @@ public class TableInnerJoinOperator extends AbstractOperator { break; } } + + leftIndex++; } - private boolean leftBlockNotEmpty() { + /** + * @return true if right block is consumed up + */ + protected boolean rightBlockFinish() { + rightIndex++; + + if (rightIndex >= rightBlockList.get(rightBlockListIdx).getPositionCount()) { + rightBlockListIdx++; + rightIndex = 0; + } + + if (rightBlockListIdx >= rightBlockList.size()) { + rightBlockListIdx = 0; + rightIndex = 0; + return true; + } + + return false; + } + + protected boolean leftBlockNotEmpty() { return leftBlock != null && leftIndex < leftBlock.getPositionCount(); } - private boolean rightBlockNotEmpty() { + protected boolean rightBlockNotEmpty() { return (!rightBlockList.isEmpty() && rightBlockListIdx < rightBlockList.size() && rightIndex < rightBlockList.get(rightBlockListIdx).getPositionCount()) || (hasCachedNextRightBlock && cachedNextRightBlock != null); } - private void appendValueToResult(int tmpRightBlockListIdx, int tmpRightIndex) { + protected void appendValueToResult(int tmpRightBlockListIdx, int tmpRightIndex) { + appendLeftBlockData(leftOutputSymbolIdx, resultBuilder, leftBlock, leftIndex); + + appendRightBlockData( + rightBlockList, + tmpRightBlockListIdx, + tmpRightIndex, + leftOutputSymbolIdx, + rightOutputSymbolIdx, + resultBuilder); + } + + protected void appendLeftBlockData( + int[] leftOutputSymbolIdx, TsBlockBuilder resultBuilder, TsBlock leftBlock, int leftIndex) { for (int i = 0; i < leftOutputSymbolIdx.length; i++) { + int idx = leftOutputSymbolIdx[i]; ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); - if (leftBlock.getColumn(leftOutputSymbolIdx[i]).isNull(leftIndex)) { + if (leftBlock.getColumn(idx).isNull(leftIndex)) { columnBuilder.appendNull(); } else { - columnBuilder.write(leftBlock.getColumn(leftOutputSymbolIdx[i]), leftIndex); + columnBuilder.write(leftBlock.getColumn(idx), leftIndex); } } + } - for (int i = 0; i < rightOutputSymbolIdx.length; i++) { - ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(leftOutputSymbolIdx.length + i); + protected void appendRightBlockData( + List<TsBlock> rightBlockList, + int rightBlockListIdx, + int rightIndex, + int[] leftOutputSymbolIdxArray, + int[] rightOutputSymbolIdxArray, + TsBlockBuilder resultBuilder) { + for (int i = 0; i < rightOutputSymbolIdxArray.length; i++) { + ColumnBuilder columnBuilder = + resultBuilder.getColumnBuilder(leftOutputSymbolIdxArray.length + i); if (rightBlockList - .get(tmpRightBlockListIdx) - .getColumn(rightOutputSymbolIdx[i]) - .isNull(tmpRightIndex)) { + .get(rightBlockListIdx) + .getColumn(rightOutputSymbolIdxArray[i]) + .isNull(rightIndex)) { columnBuilder.appendNull(); } else { columnBuilder.write( - rightBlockList.get(tmpRightBlockListIdx).getColumn(rightOutputSymbolIdx[i]), - tmpRightIndex); + rightBlockList.get(rightBlockListIdx).getColumn(rightOutputSymbolIdxArray[i]), + rightIndex); } } } + protected void resetRightBlockList() { + rightBlockList.clear(); + rightBlockListIdx = 0; + rightIndex = 0; + } + public static TsBlock buildResultTsBlock(TsBlockBuilder resultBuilder) { Column[] valueColumns = new Column[resultBuilder.getValueColumnBuilders().length]; for (int i = 0; i < valueColumns.length; ++i) {
