This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6742d4bdf82 Add full outer time join for table model
6742d4bdf82 is described below
commit 6742d4bdf8235cb77dbffb12c67bff7b252a1f0d
Author: Beyyes <[email protected]>
AuthorDate: Fri Sep 27 10:10:48 2024 +0800
Add full outer time join for table model
---
.../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 166 +++++++++++++
.../fragment/FragmentInstanceContext.java | 5 +-
...erator.java => TableFullOuterJoinOperator.java} | 268 +++++++++++++++------
...inOperator.java => TableInnerJoinOperator.java} | 116 +++++----
.../plan/planner/TableOperatorGenerator.java | 22 +-
.../relational/analyzer/StatementAnalyzer.java | 5 +-
.../plan/relational/planner/RelationPlanner.java | 17 +-
.../PushLimitOffsetIntoTableScan.java | 6 +-
.../optimizations/PushPredicateIntoTableScan.java | 9 +-
.../column/multi/CoalesceColumnTransformer.java | 1 +
.../plan/relational/analyzer/JoinTest.java | 10 +-
11 files changed, 465 insertions(+), 160 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index 75793e62998..feda4776749 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -547,5 +547,171 @@ public class IoTDBMultiIDsWithAttributesTableIT {
+ "AND t2.time<=31536001000 AND t2.device in ('d1','d2')\n"
+ "ORDER BY t1.time, t1.device, t2.device LIMIT 20";
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ // join using
+ sql =
+ "SELECT time, t1.device, t1.level, t1_num_add, t2.device, t2.attr2,
t2.num, t2.str\n"
+ + "FROM (SELECT *,num+1 as t1_num_add FROM table0 WHERE time>=80)
t1 \n"
+ + "JOIN (SELECT * FROM table0 WHERE floatNum<1000) t2 USING(time)
\n"
+ + "WHERE cast(t1.num as double)>0 AND t1.level!='l1' \n"
+ + "AND time<=31536001000 AND t2.device in ('d1','d2')\n"
+ + "ORDER BY time, t1.device, t2.device LIMIT 20";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+ }
+
+ // no filter
+ @Test
+ public void fullOuterJoinTest1() {
+ String[] expectedHeader =
+ new String[] {"time", "device", "level", "num", "device", "attr2",
"num", "str"};
+ String[] retArray =
+ new String[] {
+ "1970-01-01T00:00:00.000Z,d1,l1,3,d1,d,3,coconut,",
+ "1970-01-01T00:00:00.000Z,d1,l1,3,d2,c,3,coconut,",
+ "1970-01-01T00:00:00.000Z,d2,l1,3,d1,d,3,coconut,",
+ "1970-01-01T00:00:00.000Z,d2,l1,3,d2,c,3,coconut,",
+ "1970-01-01T00:00:00.020Z,d1,l2,2,d1,zz,2,pineapple,",
+ "1970-01-01T00:00:00.020Z,d1,l2,2,d2,null,2,pineapple,",
+ "1970-01-01T00:00:00.020Z,d2,l2,2,d1,zz,2,pineapple,",
+ "1970-01-01T00:00:00.020Z,d2,l2,2,d2,null,2,pineapple,",
+ "1970-01-01T00:00:00.040Z,d1,l3,1,d1,a,1,apricot,",
+ "1970-01-01T00:00:00.040Z,d1,l3,1,d2,null,1,apricot,",
+ "1970-01-01T00:00:00.040Z,d2,l3,1,d1,a,1,apricot,",
+ "1970-01-01T00:00:00.040Z,d2,l3,1,d2,null,1,apricot,",
+ "1970-01-01T00:00:00.080Z,d1,l4,9,d1,null,9,apple,",
+ "1970-01-01T00:00:00.080Z,d1,l4,9,d2,null,9,apple,",
+ "1970-01-01T00:00:00.080Z,d2,l4,9,d1,null,9,apple,",
+ "1970-01-01T00:00:00.080Z,d2,l4,9,d2,null,9,apple,",
+ "1970-01-01T00:00:00.100Z,d1,l5,8,d1,null,8,papaya,",
+ "1970-01-01T00:00:00.100Z,d1,l5,8,d2,null,8,papaya,",
+ "1970-01-01T00:00:00.100Z,d2,l5,8,d1,null,8,papaya,",
+ "1970-01-01T00:00:00.100Z,d2,l5,8,d2,null,8,papaya,",
+ "1971-01-01T00:00:00.000Z,d1,l1,6,d1,d,6,banana,",
+ "1971-01-01T00:00:00.000Z,d1,l1,6,d2,c,6,banana,",
+ "1971-01-01T00:00:00.000Z,d2,l1,6,d1,d,6,banana,",
+ "1971-01-01T00:00:00.000Z,d2,l1,6,d2,c,6,banana,",
+ "1971-01-01T00:00:00.100Z,d1,l2,10,d1,zz,10,pumelo,",
+ "1971-01-01T00:00:00.100Z,d1,l2,10,d2,null,10,pumelo,",
+ "1971-01-01T00:00:00.100Z,d2,l2,10,d1,zz,10,pumelo,",
+ "1971-01-01T00:00:00.100Z,d2,l2,10,d2,null,10,pumelo,",
+ "1971-01-01T00:00:00.500Z,d1,l3,4,d1,a,4,peach,",
+ "1971-01-01T00:00:00.500Z,d1,l3,4,d2,null,4,peach,",
+ "1971-01-01T00:00:00.500Z,d2,l3,4,d1,a,4,peach,",
+ "1971-01-01T00:00:00.500Z,d2,l3,4,d2,null,4,peach,",
+ "1971-01-01T00:00:01.000Z,d1,l4,5,d1,null,5,orange,",
+ "1971-01-01T00:00:01.000Z,d1,l4,5,d2,null,5,orange,",
+ "1971-01-01T00:00:01.000Z,d2,l4,5,d1,null,5,orange,",
+ "1971-01-01T00:00:01.000Z,d2,l4,5,d2,null,5,orange,",
+ "1971-01-01T00:00:10.000Z,d1,l5,7,d1,null,7,lemon,",
+ "1971-01-01T00:00:10.000Z,d1,l5,7,d2,null,7,lemon,",
+ "1971-01-01T00:00:10.000Z,d2,l5,7,d1,null,7,lemon,",
+ "1971-01-01T00:00:10.000Z,d2,l5,7,d2,null,7,lemon,",
+ "1971-01-01T00:01:40.000Z,d1,l1,11,d1,d,11,pitaya,",
+ "1971-01-01T00:01:40.000Z,d1,l1,11,d2,c,11,pitaya,",
+ "1971-01-01T00:01:40.000Z,d2,l1,11,d1,d,11,pitaya,",
+ "1971-01-01T00:01:40.000Z,d2,l1,11,d2,c,11,pitaya,",
+ "1971-04-26T17:46:40.000Z,d1,l2,12,d1,zz,12,strawberry,",
+ "1971-04-26T17:46:40.000Z,d1,l2,12,d2,null,12,strawberry,",
+ "1971-04-26T17:46:40.000Z,d2,l2,12,d1,zz,12,strawberry,",
+ "1971-04-26T17:46:40.000Z,d2,l2,12,d2,null,12,strawberry,",
+ "1971-04-26T17:46:40.020Z,d1,l3,14,d1,a,14,cherry,",
+ "1971-04-26T17:46:40.020Z,d1,l3,14,d2,null,14,cherry,",
+ "1971-04-26T17:46:40.020Z,d2,l3,14,d1,a,14,cherry,",
+ "1971-04-26T17:46:40.020Z,d2,l3,14,d2,null,14,cherry,",
+ "1971-04-26T18:01:40.000Z,d1,l4,13,d1,null,13,lychee,",
+ "1971-04-26T18:01:40.000Z,d1,l4,13,d2,null,13,lychee,",
+ "1971-04-26T18:01:40.000Z,d2,l4,13,d1,null,13,lychee,",
+ "1971-04-26T18:01:40.000Z,d2,l4,13,d2,null,13,lychee,",
+ "1971-08-20T11:33:20.000Z,d1,l5,15,d1,null,15,watermelon,",
+ "1971-08-20T11:33:20.000Z,d1,l5,15,d2,null,15,watermelon,",
+ "1971-08-20T11:33:20.000Z,d2,l5,15,d1,null,15,watermelon,",
+ "1971-08-20T11:33:20.000Z,d2,l5,15,d2,null,15,watermelon,",
+ };
+
+ // join on
+ String sql =
+ "SELECT t1.time as time, t1.device, t1.level, t1.num, t2.device,
t2.attr2, t2.num, t2.str\n"
+ + "FROM table0 t1 FULL JOIN table0 t2 ON t1.time = t2.time \n"
+ + "ORDER BY t1.time, t1.device, t2.device";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ sql =
+ "SELECT t1.time as time, t1.device, t1.level, t1.num, t2.device,
t2.attr2, t2.num, t2.str\n"
+ + "FROM table0 t1 INNER JOIN table0 t2 ON t1.time = t2.time \n"
+ + "ORDER BY t1.time, t1.device, t2.device";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ // join using
+ sql =
+ "SELECT time, t1.device, t1.level, t1.num, t2.device, t2.attr2,
t2.num, t2.str\n"
+ + "FROM table0 t1 FULL OUTER JOIN table0 t2 USING(time)\n"
+ + "ORDER BY time, t1.device, t2.device";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ sql =
+ "SELECT time, t1.device, t1.level, t1.num, t2.device, t2.attr2,
t2.num, t2.str\n"
+ + "FROM table0 t1 INNER JOIN table0 t2 USING(time)\n"
+ + "ORDER BY time, t1.device, t2.device";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+ }
+
+ // has filter
+ @Test
+ public void fullOuterJoinTest2() {
+ String[] expectedHeader =
+ new String[] {"time", "device", "level", "t1_num_add", "device",
"attr2", "num", "str"};
+ String[] retArray =
+ new String[] {
+ "1970-01-01T00:00:00.000Z,null,null,null,d1,d,3,coconut,",
+ "1970-01-01T00:00:00.000Z,null,null,null,d2,c,3,coconut,",
+ "1970-01-01T00:00:00.020Z,null,null,null,d1,zz,2,pineapple,",
+ "1970-01-01T00:00:00.020Z,null,null,null,d2,null,2,pineapple,",
+ "1970-01-01T00:00:00.040Z,null,null,null,d1,a,1,apricot,",
+ "1970-01-01T00:00:00.040Z,null,null,null,d2,null,1,apricot,",
+ "1970-01-01T00:00:00.080Z,d1,l4,10,d1,null,9,apple,",
+ "1970-01-01T00:00:00.080Z,d1,l4,10,d2,null,9,apple,",
+ "1970-01-01T00:00:00.080Z,d2,l4,10,d1,null,9,apple,",
+ "1970-01-01T00:00:00.080Z,d2,l4,10,d2,null,9,apple,",
+ "1971-01-01T00:00:00.100Z,d1,l2,11,d1,zz,10,pumelo,",
+ "1971-01-01T00:00:00.100Z,d1,l2,11,d2,null,10,pumelo,",
+ "1971-01-01T00:00:00.100Z,d2,l2,11,d1,zz,10,pumelo,",
+ "1971-01-01T00:00:00.100Z,d2,l2,11,d2,null,10,pumelo,",
+ "1971-01-01T00:00:00.500Z,d1,l3,5,d1,a,4,peach,",
+ "1971-01-01T00:00:00.500Z,d1,l3,5,d2,null,4,peach,",
+ "1971-01-01T00:00:00.500Z,d2,l3,5,d1,a,4,peach,",
+ "1971-01-01T00:00:00.500Z,d2,l3,5,d2,null,4,peach,",
+ "1971-01-01T00:00:01.000Z,d1,l4,6,d1,null,5,orange,",
+ "1971-01-01T00:00:01.000Z,d1,l4,6,d2,null,5,orange,",
+ "1971-01-01T00:00:01.000Z,d2,l4,6,d1,null,5,orange,",
+ "1971-01-01T00:00:01.000Z,d2,l4,6,d2,null,5,orange,",
+ "1971-01-01T00:00:10.000Z,d1,l5,8,null,null,null,null,",
+ "1971-01-01T00:00:10.000Z,d2,l5,8,null,null,null,null,",
+ "1971-04-26T17:46:40.000Z,d1,l2,13,null,null,null,null,",
+ "1971-04-26T17:46:40.000Z,d2,l2,13,null,null,null,null,",
+ "1971-04-26T17:46:40.020Z,d1,l3,15,null,null,null,null,",
+ "1971-04-26T17:46:40.020Z,d2,l3,15,null,null,null,null,",
+ "1971-04-26T18:01:40.000Z,d1,l4,14,null,null,null,null,",
+ "1971-04-26T18:01:40.000Z,d2,l4,14,null,null,null,null,",
+ "1971-08-20T11:33:20.000Z,d1,l5,16,null,null,null,null,",
+ "1971-08-20T11:33:20.000Z,d2,l5,16,null,null,null,null,",
+ };
+
+ // join using
+ String sql =
+ "SELECT time, t1.device, t1.level, t1_num_add, t2.device, t2.attr2,
t2.num, t2.str\n"
+ + "FROM (SELECT *,num+1 as t1_num_add FROM table0 WHERE TIME>=80
AND level!='l1' AND cast(num as double)>0) t1 \n"
+ + "FULL JOIN (SELECT * FROM table0 WHERE TIME<=31536001000 AND
floatNum<1000 AND device in ('d1','d2')) t2 \n"
+ + "USING(time) \n"
+ + "ORDER BY time, t1.device, t2.device";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+ // join on
+ sql =
+ "SELECT COALESCE(t1.time, t2.time) as time, t1.device, t1.level,
t1_num_add, t2.device, t2.attr2, t2.num, t2.str\n"
+ + "FROM (SELECT *,num+1 as t1_num_add FROM table0 WHERE TIME>=80
AND level!='l1' AND cast(num as double)>0) t1 \n"
+ + "FULL JOIN (SELECT * FROM table0 WHERE TIME<=31536001000 AND
floatNum<1000 AND device in ('d1','d2')) t2 \n"
+ + "ON t1.time = t2.time \n"
+ + "ORDER BY time, t1.device, t2.device";
+ tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index a2a3021a51b..94917b12cd6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -406,9 +406,8 @@ public class FragmentInstanceContext extends QueryContext {
if (globalTimeFilter == null) {
globalTimeFilter = timeFilter;
} else {
- // In join case, there may exist more than one table and time filter,
join criteria only
- // support and condition, so use and to connect these two filters
- globalTimeFilter = FilterFactory.and(globalTimeFilter, timeFilter);
+ // In join case, there may exist more than one table and time filter
+ globalTimeFilter = FilterFactory.or(globalTimeFilter, timeFilter);
// throw new IllegalStateException(
// "globalTimeFilter in FragmentInstanceContext should only be set
once in Table Model!");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableFullOuterJoinOperator.java
similarity index 61%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableFullOuterJoinOperator.java
index c086666ece5..e5b669f8300 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableFullOuterJoinOperator.java
@@ -20,20 +20,17 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.ArrayList;
@@ -41,49 +38,52 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
-import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator.buildResultTsBlock;
-public class InnerJoinOperator implements ProcessOperator {
+public class TableFullOuterJoinOperator extends AbstractOperator {
private static final long INSTANCE_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(InnerJoinOperator.class);
-
- private final OperatorContext operatorContext;
+
RamUsageEstimator.shallowSizeOfInstance(TableFullOuterJoinOperator.class);
private final Operator leftChild;
private TsBlock leftBlock;
+ private final int leftTimeColumnPosition;
private int leftIndex; // start index of leftTsBlock
- private static final int TIME_COLUMN_POSITION = 0;
private final int[] leftOutputSymbolIdx;
+ private boolean leftFinished;
private final Operator rightChild;
private final List<TsBlock> rightBlockList = new ArrayList<>();
+ private final int rightTimeColumnPosition;
private int rightBlockListIdx;
private int rightIndex; // start index of rightTsBlock
private final int[] rightOutputSymbolIdx;
private TsBlock cachedNextRightBlock;
private boolean hasCachedNextRightBlock;
+ private boolean rightFinished;
+ private long lastMatchedRightTime = Long.MIN_VALUE;
private final TimeComparator comparator;
private final TsBlockBuilder resultBuilder;
- private final long maxReturnSize =
- TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
-
protected MemoryReservationManager memoryReservationManager;
- public InnerJoinOperator(
+ public TableFullOuterJoinOperator(
OperatorContext operatorContext,
Operator leftChild,
+ int leftTimeColumnPosition,
int[] leftOutputSymbolIdx,
Operator rightChild,
+ int rightTimeColumnPosition,
int[] rightOutputSymbolIdx,
TimeComparator timeComparator,
List<TSDataType> dataTypes) {
this.operatorContext = operatorContext;
this.leftChild = leftChild;
+ this.leftTimeColumnPosition = leftTimeColumnPosition;
this.leftOutputSymbolIdx = leftOutputSymbolIdx;
this.rightChild = rightChild;
this.rightOutputSymbolIdx = rightOutputSymbolIdx;
+ this.rightTimeColumnPosition = rightTimeColumnPosition;
this.comparator = timeComparator;
this.resultBuilder = new TsBlockBuilder(dataTypes);
@@ -108,14 +108,35 @@ public class InnerJoinOperator implements ProcessOperator
{
}
}
+ @Override
+ public boolean isFinished() throws Exception {
+ if (retainedTsBlock != null) {
+ return false;
+ }
+
+ return !leftBlockNotEmpty()
+ && leftChild.isFinished()
+ && !rightBlockNotEmpty()
+ && rightChild.isFinished();
+ }
+
@Override
public boolean hasNext() throws Exception {
+ if (retainedTsBlock != null) {
+ return true;
+ }
+
return (leftBlockNotEmpty() || leftChild.hasNextWithTimer())
- && (rightBlockNotEmpty() || rightChild.hasNextWithTimer());
+ || (rightBlockNotEmpty() || rightChild.hasNextWithTimer());
}
@Override
public TsBlock next() throws Exception {
+ if (retainedTsBlock != null) {
+ return getResultFromRetainedTsBlock();
+ }
+ resultBuilder.reset();
+
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
// prepare leftBlock and rightBlockList with cachedNextRightBlock
@@ -123,8 +144,30 @@ public class InnerJoinOperator implements ProcessOperator {
return null;
}
- // all the rightTsBlock is less than leftTsBlock, just skip right
+ if (leftFinished || rightFinished) {
+ if (leftFinished) {
+ appendRightWithEmptyLeft();
+ for (int i = 1; i < rightBlockList.size(); i++) {
+ memoryReservationManager.releaseMemoryCumulatively(
+ rightBlockList.get(i).getRetainedSizeInBytes());
+ }
+ rightBlockList.clear();
+ rightBlockListIdx = 0;
+ rightIndex = 0;
+ resultTsBlock = buildResultTsBlock(resultBuilder);
+ return checkTsBlockSizeAndGetResult();
+ } else {
+ appendLeftWithEmptyRight();
+ leftBlock = null;
+ leftIndex = 0;
+ resultTsBlock = buildResultTsBlock(resultBuilder);
+ return checkTsBlockSizeAndGetResult();
+ }
+ }
+
+ // all the rightTsBlock is less than leftTsBlock, append right with empty
left
if (comparator.lessThan(getRightEndTime(), getCurrentLeftTime())) {
+ appendRightWithEmptyLeft();
for (int i = 1; i < rightBlockList.size(); i++) {
memoryReservationManager.releaseMemoryCumulatively(
rightBlockList.get(i).getRetainedSizeInBytes());
@@ -135,8 +178,9 @@ public class InnerJoinOperator implements ProcessOperator {
return null;
}
- // all the leftTsBlock is less than rightTsBlock, just skip left
+ // all the leftTsBlock is less than rightTsBlock, append left with empty
right
else if (comparator.lessThan(getLeftEndTime(), getCurrentRightTime())) {
+ appendLeftWithEmptyRight();
leftBlock = null;
leftIndex = 0;
return null;
@@ -147,6 +191,7 @@ public class InnerJoinOperator implements ProcessOperator {
// all right block time is not matched
if (!comparator.canContinueInclusive(leftProbeTime, getRightEndTime())) {
+ appendRightWithEmptyLeft();
for (int i = 1; i < rightBlockList.size(); i++) {
memoryReservationManager.releaseMemoryCumulatively(
rightBlockList.get(i).getRetainedSizeInBytes());
@@ -174,63 +219,56 @@ public class InnerJoinOperator implements ProcessOperator
{
return null;
}
- Column[] valueColumns = new
Column[resultBuilder.getValueColumnBuilders().length];
- for (int i = 0; i < valueColumns.length; ++i) {
- valueColumns[i] = resultBuilder.getValueColumnBuilders()[i].build();
- if (valueColumns[i].getPositionCount() !=
resultBuilder.getPositionCount()) {
- throw new IllegalStateException(
- String.format(
- "Declared positions (%s) does not match column %s's number of
entries (%s)",
- resultBuilder.getPositionCount(), i,
valueColumns[i].getPositionCount()));
- }
- }
-
- TsBlock result =
- TsBlock.wrapBlocksWithoutCopy(
- this.resultBuilder.getPositionCount(),
- new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE,
this.resultBuilder.getPositionCount()),
- valueColumns);
- resultBuilder.reset();
- return result;
+ resultTsBlock = buildResultTsBlock(resultBuilder);
+ return checkTsBlockSizeAndGetResult();
}
private boolean prepareInput(long start, long maxRuntime) throws Exception {
- if ((leftBlock == null || leftBlock.getPositionCount() == leftIndex)
- && leftChild.hasNextWithTimer()) {
- leftBlock = leftChild.nextWithTimer();
- leftIndex = 0;
+
+ if (!leftFinished && (leftBlock == null || leftBlock.getPositionCount() ==
leftIndex)) {
+ if (leftChild.hasNextWithTimer()) {
+ leftBlock = leftChild.nextWithTimer();
+ leftIndex = 0;
+ } else {
+ leftFinished = true;
+ }
}
- if (rightBlockList.isEmpty()) {
- if (hasCachedNextRightBlock && cachedNextRightBlock != null) {
- rightBlockList.add(cachedNextRightBlock);
- hasCachedNextRightBlock = false;
- cachedNextRightBlock = null;
- tryCachedNextRightTsBlock();
- } else if (rightChild.hasNextWithTimer()) {
- TsBlock block = rightChild.nextWithTimer();
- if (block != null) {
- rightBlockList.add(block);
+ if (!rightFinished) {
+ if (rightBlockList.isEmpty()) {
+ if (hasCachedNextRightBlock && cachedNextRightBlock != null) {
+ rightBlockList.add(cachedNextRightBlock);
+ hasCachedNextRightBlock = false;
+ cachedNextRightBlock = null;
tryCachedNextRightTsBlock();
+ } else if (rightChild.hasNextWithTimer()) {
+ TsBlock block = rightChild.nextWithTimer();
+ if (block != null) {
+ rightBlockList.add(block);
+ tryCachedNextRightTsBlock();
+ }
+ } else {
+ rightFinished = true;
+ hasCachedNextRightBlock = true;
+ cachedNextRightBlock = null;
}
} else {
- hasCachedNextRightBlock = true;
- cachedNextRightBlock = null;
- }
- } else {
- if (!hasCachedNextRightBlock) {
- tryCachedNextRightTsBlock();
+ if (!hasCachedNextRightBlock) {
+ tryCachedNextRightTsBlock();
+ }
}
}
- return leftBlockNotEmpty() && rightBlockNotEmpty() &&
hasCachedNextRightBlock;
+ return (leftBlockNotEmpty() && rightBlockNotEmpty() &&
hasCachedNextRightBlock)
+ || (leftBlockNotEmpty() && rightFinished)
+ || (leftFinished && rightBlockNotEmpty() && hasCachedNextRightBlock);
}
private void tryCachedNextRightTsBlock() throws Exception {
if (rightChild.hasNextWithTimer()) {
TsBlock block = rightChild.nextWithTimer();
if (block != null) {
- if (block.getColumn(TIME_COLUMN_POSITION).getLong(0) ==
getRightEndTime()) {
+ if (block.getColumn(rightTimeColumnPosition).getLong(0) ==
getRightEndTime()) {
memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes());
rightBlockList.add(block);
} else {
@@ -245,34 +283,38 @@ public class InnerJoinOperator implements ProcessOperator
{
}
private long getCurrentLeftTime() {
- return leftBlock.getColumn(TIME_COLUMN_POSITION).getLong(leftIndex);
+ return leftBlock.getColumn(leftTimeColumnPosition).getLong(leftIndex);
}
private long getLeftEndTime() {
- return
leftBlock.getColumn(TIME_COLUMN_POSITION).getLong(leftBlock.getPositionCount()
- 1);
+ return
leftBlock.getColumn(leftTimeColumnPosition).getLong(leftBlock.getPositionCount()
- 1);
}
private long getCurrentRightTime() {
return rightBlockList
.get(rightBlockListIdx)
- .getColumn(TIME_COLUMN_POSITION)
+ .getColumn(rightTimeColumnPosition)
.getLong(rightIndex);
}
private long getRightTime(int idx1, int idx2) {
- return
rightBlockList.get(idx1).getColumn(TIME_COLUMN_POSITION).getLong(idx2);
+ return
rightBlockList.get(idx1).getColumn(rightTimeColumnPosition).getLong(idx2);
}
private long getRightEndTime() {
TsBlock lastRightTsBlock = rightBlockList.get(rightBlockList.size() - 1);
return lastRightTsBlock
- .getColumn(TIME_COLUMN_POSITION)
+ .getColumn(rightTimeColumnPosition)
.getLong(lastRightTsBlock.getPositionCount() - 1);
}
private void appendResult(long leftTime) {
while (comparator.lessThan(getCurrentRightTime(), leftTime)) {
+ if (getCurrentRightTime() > lastMatchedRightTime) {
+ appendOneRightRowWithEmptyLeft();
+ }
+
rightIndex++;
if (rightIndex >=
rightBlockList.get(rightBlockListIdx).getPositionCount()) {
@@ -289,6 +331,9 @@ public class InnerJoinOperator implements ProcessOperator {
int tmpBlockIdx = rightBlockListIdx, tmpIdx = rightIndex;
while (leftTime == getRightTime(tmpBlockIdx, tmpIdx)) {
+ // lastMatchedRightBlockListIdx = rightBlockListIdx;
+ // lastMatchedRightIdx = rightIndex;
+ lastMatchedRightTime = leftTime;
appendValueToResult(tmpBlockIdx, tmpIdx);
resultBuilder.declarePosition();
@@ -305,6 +350,87 @@ public class InnerJoinOperator implements ProcessOperator {
}
}
+ private void appendLeftWithEmptyRight() {
+ while (leftIndex < leftBlock.getPositionCount()) {
+ for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
+ ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ if (leftBlock.getColumn(leftOutputSymbolIdx[i]).isNull(leftIndex)) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.write(leftBlock.getColumn(leftOutputSymbolIdx[i]),
leftIndex);
+ }
+ }
+
+ for (int i = 0; i < rightOutputSymbolIdx.length; i++) {
+ ColumnBuilder columnBuilder =
+ resultBuilder.getColumnBuilder(leftOutputSymbolIdx.length + i);
+ columnBuilder.appendNull();
+ }
+
+ resultBuilder.declarePosition();
+ leftIndex++;
+ }
+ }
+
+ private void appendRightWithEmptyLeft() {
+ while (rightBlockListIdx < rightBlockList.size()) {
+
+ if (getCurrentRightTime() > lastMatchedRightTime) {
+ for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
+ ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ columnBuilder.appendNull();
+ }
+
+ for (int i = 0; i < rightOutputSymbolIdx.length; i++) {
+ ColumnBuilder columnBuilder =
+ resultBuilder.getColumnBuilder(leftOutputSymbolIdx.length + i);
+
+ if (rightBlockList
+ .get(rightBlockListIdx)
+ .getColumn(rightOutputSymbolIdx[i])
+ .isNull(rightIndex)) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.write(
+
rightBlockList.get(rightBlockListIdx).getColumn(rightOutputSymbolIdx[i]),
+ rightIndex);
+ }
+ }
+
+ resultBuilder.declarePosition();
+ }
+
+ rightIndex++;
+ if (rightIndex >=
rightBlockList.get(rightBlockListIdx).getPositionCount()) {
+ rightIndex = 0;
+ rightBlockListIdx++;
+ }
+ }
+ }
+
+ private void appendOneRightRowWithEmptyLeft() {
+ for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
+ ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+ columnBuilder.appendNull();
+ }
+
+ for (int i = 0; i < rightOutputSymbolIdx.length; i++) {
+ ColumnBuilder columnBuilder =
resultBuilder.getColumnBuilder(leftOutputSymbolIdx.length + i);
+
+ if (rightBlockList
+ .get(rightBlockListIdx)
+ .getColumn(rightOutputSymbolIdx[i])
+ .isNull(rightIndex)) {
+ columnBuilder.appendNull();
+ } else {
+ columnBuilder.write(
+
rightBlockList.get(rightBlockListIdx).getColumn(rightOutputSymbolIdx[i]),
rightIndex);
+ }
+ }
+
+ resultBuilder.declarePosition();
+ }
+
private boolean leftBlockNotEmpty() {
return leftBlock != null && leftIndex < leftBlock.getPositionCount();
}
@@ -341,14 +467,6 @@ public class InnerJoinOperator implements ProcessOperator {
}
}
- @Override
- public boolean isFinished() throws Exception {
- return !leftBlockNotEmpty()
- && leftChild.isFinished()
- && !rightBlockNotEmpty()
- && rightChild.isFinished();
- }
-
@Override
public void close() throws Exception {
if (leftChild != null) {
@@ -365,11 +483,6 @@ public class InnerJoinOperator implements ProcessOperator {
}
}
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
@Override
public long calculateMaxPeekMemory() {
return Math.max(
@@ -381,7 +494,7 @@ public class InnerJoinOperator implements ProcessOperator {
@Override
public long calculateMaxReturnSize() {
- return maxReturnSize;
+ return maxReturnSize * 2;
}
@Override
@@ -391,7 +504,8 @@ public class InnerJoinOperator implements ProcessOperator {
return leftChild.calculateMaxReturnSize()
+ leftChild.calculateRetainedSizeAfterCallingNext()
+ rightChild.calculateMaxReturnSize()
- + rightChild.calculateRetainedSizeAfterCallingNext();
+ + rightChild.calculateRetainedSizeAfterCallingNext()
+ + maxReturnSize;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java
similarity index 87%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java
index c086666ece5..73963c2359b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InnerJoinOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java
@@ -20,16 +20,15 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
@@ -43,20 +42,19 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.successfulAsList;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
-public class InnerJoinOperator implements ProcessOperator {
+public class TableInnerJoinOperator extends AbstractOperator {
private static final long INSTANCE_SIZE =
- RamUsageEstimator.shallowSizeOfInstance(InnerJoinOperator.class);
-
- private final OperatorContext operatorContext;
+ RamUsageEstimator.shallowSizeOfInstance(TableInnerJoinOperator.class);
private final Operator leftChild;
private TsBlock leftBlock;
private int leftIndex; // start index of leftTsBlock
- private static final int TIME_COLUMN_POSITION = 0;
+ private final int leftTimeColumnPosition;
private final int[] leftOutputSymbolIdx;
private final Operator rightChild;
private final List<TsBlock> rightBlockList = new ArrayList<>();
+ private final int rightTimeColumnPosition;
private int rightBlockListIdx;
private int rightIndex; // start index of rightTsBlock
private final int[] rightOutputSymbolIdx;
@@ -66,23 +64,24 @@ public class InnerJoinOperator implements ProcessOperator {
private final TimeComparator comparator;
private final TsBlockBuilder resultBuilder;
- private final long maxReturnSize =
- TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
-
protected MemoryReservationManager memoryReservationManager;
- public InnerJoinOperator(
+ public TableInnerJoinOperator(
OperatorContext operatorContext,
Operator leftChild,
+ int leftTimeColumnPosition,
int[] leftOutputSymbolIdx,
Operator rightChild,
+ int rightTimeColumnPosition,
int[] rightOutputSymbolIdx,
TimeComparator timeComparator,
List<TSDataType> dataTypes) {
this.operatorContext = operatorContext;
this.leftChild = leftChild;
+ this.leftTimeColumnPosition = leftTimeColumnPosition;
this.leftOutputSymbolIdx = leftOutputSymbolIdx;
this.rightChild = rightChild;
+ this.rightTimeColumnPosition = rightTimeColumnPosition;
this.rightOutputSymbolIdx = rightOutputSymbolIdx;
this.comparator = timeComparator;
@@ -108,14 +107,35 @@ public class InnerJoinOperator implements ProcessOperator
{
}
}
+ @Override
+ public boolean isFinished() throws Exception {
+ if (retainedTsBlock != null) {
+ return true;
+ }
+
+ return !leftBlockNotEmpty()
+ && leftChild.isFinished()
+ && !rightBlockNotEmpty()
+ && rightChild.isFinished();
+ }
+
@Override
public boolean hasNext() throws Exception {
+ if (retainedTsBlock != null) {
+ return true;
+ }
+
return (leftBlockNotEmpty() || leftChild.hasNextWithTimer())
&& (rightBlockNotEmpty() || rightChild.hasNextWithTimer());
}
@Override
public TsBlock next() throws Exception {
+ if (retainedTsBlock != null) {
+ return getResultFromRetainedTsBlock();
+ }
+ resultBuilder.reset();
+
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
// prepare leftBlock and rightBlockList with cachedNextRightBlock
@@ -136,7 +156,7 @@ public class InnerJoinOperator implements ProcessOperator {
}
// all the leftTsBlock is less than rightTsBlock, just skip left
- else if (comparator.lessThan(getLeftEndTime(), getCurrentRightTime())) {
+ else if (comparator.lessThan(getLeftEndTime(),
getRightTime(rightBlockListIdx, rightIndex))) {
leftBlock = null;
leftIndex = 0;
return null;
@@ -174,24 +194,8 @@ public class InnerJoinOperator implements ProcessOperator {
return null;
}
- Column[] valueColumns = new
Column[resultBuilder.getValueColumnBuilders().length];
- for (int i = 0; i < valueColumns.length; ++i) {
- valueColumns[i] = resultBuilder.getValueColumnBuilders()[i].build();
- if (valueColumns[i].getPositionCount() !=
resultBuilder.getPositionCount()) {
- throw new IllegalStateException(
- String.format(
- "Declared positions (%s) does not match column %s's number of
entries (%s)",
- resultBuilder.getPositionCount(), i,
valueColumns[i].getPositionCount()));
- }
- }
-
- TsBlock result =
- TsBlock.wrapBlocksWithoutCopy(
- this.resultBuilder.getPositionCount(),
- new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE,
this.resultBuilder.getPositionCount()),
- valueColumns);
- resultBuilder.reset();
- return result;
+ resultTsBlock = buildResultTsBlock(resultBuilder);
+ return checkTsBlockSizeAndGetResult();
}
private boolean prepareInput(long start, long maxRuntime) throws Exception {
@@ -230,7 +234,7 @@ public class InnerJoinOperator implements ProcessOperator {
if (rightChild.hasNextWithTimer()) {
TsBlock block = rightChild.nextWithTimer();
if (block != null) {
- if (block.getColumn(TIME_COLUMN_POSITION).getLong(0) ==
getRightEndTime()) {
+ if (block.getColumn(rightTimeColumnPosition).getLong(0) ==
getRightEndTime()) {
memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes());
rightBlockList.add(block);
} else {
@@ -245,28 +249,25 @@ public class InnerJoinOperator implements ProcessOperator
{
}
private long getCurrentLeftTime() {
- return leftBlock.getColumn(TIME_COLUMN_POSITION).getLong(leftIndex);
+ return leftBlock.getColumn(leftTimeColumnPosition).getLong(leftIndex);
}
private long getLeftEndTime() {
- return
leftBlock.getColumn(TIME_COLUMN_POSITION).getLong(leftBlock.getPositionCount()
- 1);
+ return
leftBlock.getColumn(leftTimeColumnPosition).getLong(leftBlock.getPositionCount()
- 1);
}
- private long getCurrentRightTime() {
- return rightBlockList
- .get(rightBlockListIdx)
- .getColumn(TIME_COLUMN_POSITION)
- .getLong(rightIndex);
+ private long getRightTime(int blockIdx, int rowIdx) {
+ return
rightBlockList.get(blockIdx).getColumn(rightTimeColumnPosition).getLong(rowIdx);
}
- private long getRightTime(int idx1, int idx2) {
- return
rightBlockList.get(idx1).getColumn(TIME_COLUMN_POSITION).getLong(idx2);
+ private long getCurrentRightTime() {
+ return getRightTime(rightBlockListIdx, rightIndex);
}
private long getRightEndTime() {
TsBlock lastRightTsBlock = rightBlockList.get(rightBlockList.size() - 1);
return lastRightTsBlock
- .getColumn(TIME_COLUMN_POSITION)
+ .getColumn(rightTimeColumnPosition)
.getLong(lastRightTsBlock.getPositionCount() - 1);
}
@@ -341,12 +342,25 @@ public class InnerJoinOperator implements ProcessOperator
{
}
}
- @Override
- public boolean isFinished() throws Exception {
- return !leftBlockNotEmpty()
- && leftChild.isFinished()
- && !rightBlockNotEmpty()
- && rightChild.isFinished();
+ public static TsBlock buildResultTsBlock(TsBlockBuilder resultBuilder) {
+ Column[] valueColumns = new
Column[resultBuilder.getValueColumnBuilders().length];
+ for (int i = 0; i < valueColumns.length; ++i) {
+ valueColumns[i] = resultBuilder.getValueColumnBuilders()[i].build();
+ if (valueColumns[i].getPositionCount() !=
resultBuilder.getPositionCount()) {
+ throw new IllegalStateException(
+ String.format(
+ "Declared positions (%s) does not match column %s's number of
entries (%s)",
+ resultBuilder.getPositionCount(), i,
valueColumns[i].getPositionCount()));
+ }
+ }
+
+ TsBlock result =
+ TsBlock.wrapBlocksWithoutCopy(
+ resultBuilder.getPositionCount(),
+ new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE,
resultBuilder.getPositionCount()),
+ valueColumns);
+ resultBuilder.reset();
+ return result;
}
@Override
@@ -365,11 +379,6 @@ public class InnerJoinOperator implements ProcessOperator {
}
}
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
@Override
public long calculateMaxPeekMemory() {
return Math.max(
@@ -391,7 +400,8 @@ public class InnerJoinOperator implements ProcessOperator {
return leftChild.calculateMaxReturnSize()
+ leftChild.calculateRetainedSizeAfterCallingNext()
+ rightChild.calculateMaxReturnSize()
- + rightChild.calculateRetainedSizeAfterCallingNext();
+ + rightChild.calculateRetainedSizeAfterCallingNext()
+ + maxReturnSize;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 754cde57912..b0b520c92bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -49,7 +49,8 @@ import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSo
import
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
-import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InnerJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableFullOuterJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
@@ -760,11 +761,15 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
Operator leftChild = node.getLeftChild().accept(this, context);
Operator rightChild = node.getRightChild().accept(this, context);
+ int leftTimeColumnPosition =
+
node.getLeftChild().getOutputSymbols().indexOf(node.getCriteria().get(0).getLeft());
int[] leftOutputSymbolIdx = new int[node.getLeftOutputSymbols().size()];
for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
leftOutputSymbolIdx[i] =
node.getLeftChild().getOutputSymbols().indexOf(node.getLeftOutputSymbols().get(i));
}
+ int rightTimeColumnPosition =
+
node.getRightChild().getOutputSymbols().indexOf(node.getCriteria().get(0).getRight());
int[] rightOutputSymbolIdx = new int[node.getRightOutputSymbols().size()];
for (int i = 0; i < rightOutputSymbolIdx.length; i++) {
rightOutputSymbolIdx[i] =
@@ -772,11 +777,24 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
}
if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
- return new InnerJoinOperator(
+ return new TableInnerJoinOperator(
operatorContext,
leftChild,
+ leftTimeColumnPosition,
leftOutputSymbolIdx,
rightChild,
+ rightTimeColumnPosition,
+ rightOutputSymbolIdx,
+ ASC_TIME_COMPARATOR,
+ dataTypes);
+ } else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.FULL) {
+ return new TableFullOuterJoinOperator(
+ operatorContext,
+ leftChild,
+ leftTimeColumnPosition,
+ leftOutputSymbolIdx,
+ rightChild,
+ rightTimeColumnPosition,
rightOutputSymbolIdx,
ASC_TIME_COMPARATOR,
dataTypes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index a692ef5ed3b..e098d231336 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -1895,10 +1895,7 @@ public class StatementAnalyzer {
createAndAssignScope(
node, scope,
left.getRelationType().joinWith(right.getRelationType()));
- if (node.getType() == Join.Type.CROSS
- || node.getType() == LEFT
- || node.getType() == RIGHT
- || node.getType() == FULL) {
+ if (node.getType() == Join.Type.CROSS || node.getType() == LEFT ||
node.getType() == RIGHT) {
throw new SemanticException(
String.format(
"%s JOIN is not supported, only support INNER JOIN in current
version.",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 37bbc69c060..c587774bcef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression;
import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@@ -83,6 +84,7 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.QueryPlann
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.QueryPlanner.coerceIfNecessary;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractPredicates;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.CROSS;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.FULL;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.IMPLICIT;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.INNER;
@@ -329,12 +331,15 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
Symbol output = symbolAllocator.newSymbol(column,
analysis.getType(column));
outputs.add(output);
queryContext.getTypeProvider().putTableModelType(output, LongType.INT64);
- assignments.put(
- output, leftJoinColumns.get(column).toSymbolReference()
- // new CoalesceExpression(
- // leftJoinColumns.get(column).toSymbolReference(),
- // rightJoinColumns.get(column).toSymbolReference())
- );
+ if (node.getType() == INNER) {
+ assignments.put(output,
leftJoinColumns.get(column).toSymbolReference());
+ } else if (node.getType() == FULL) {
+ assignments.put(
+ output,
+ new CoalesceExpression(
+ leftJoinColumns.get(column).toSymbolReference(),
+ rightJoinColumns.get(column).toSymbolReference()));
+ }
}
for (int field : joinAnalysis.getOtherLeftFields()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
index 2d635afd644..90c2096070b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java
@@ -84,10 +84,8 @@ public class PushLimitOffsetIntoTableScan implements
PlanOptimizer {
node.setLeftChild(leftChild);
node.setRightChild(rightChild);
- // TODO(beyyes) when outer, left, right join is introduced, fix the
condition
- if (node.getJoinType() == JoinNode.JoinType.INNER) {
- context.enablePushDown = false;
- }
+ // TODO(beyyes) optimize for outer, left, right join
+ context.enablePushDown = false;
return node;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 241e035a033..d70ce7e0b8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -88,7 +88,6 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinN
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.extractJoinPredicate;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.joinEqualityExpression;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.processInnerJoin;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.tryNormalizeToOuterToInnerJoin;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
/**
@@ -519,7 +518,7 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
context.inheritedPredicate != null ? context.inheritedPredicate :
TRUE_LITERAL;
// See if we can rewrite outer joins in terms of a plain inner join
- node = tryNormalizeToOuterToInnerJoin(node, inheritedPredicate);
+ // node = tryNormalizeToOuterToInnerJoin(node, inheritedPredicate);
Expression leftEffectivePredicate = TRUE_LITERAL;
// effectivePredicateExtractor.extract(session, node.getLeftChild(),
types, typeAnalyzer);
@@ -548,6 +547,12 @@ public class PushPredicateIntoTableScan implements
PlanOptimizer {
postJoinPredicate = innerJoinPushDownResult.getPostJoinPredicate();
newJoinPredicate = innerJoinPushDownResult.getJoinPredicate();
break;
+ case FULL:
+ leftPredicate = TRUE_LITERAL;
+ rightPredicate = TRUE_LITERAL;
+ postJoinPredicate = inheritedPredicate;
+ newJoinPredicate = joinPredicate;
+ break;
default:
throw new IllegalStateException("Only support INNER JOIN in current
version");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/CoalesceColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/CoalesceColumnTransformer.java
index c6f84fd8eef..f88c60ba74d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/CoalesceColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/CoalesceColumnTransformer.java
@@ -42,6 +42,7 @@ public class CoalesceColumnTransformer extends
MultiColumnTransformer {
if (!column.isNull(i)) {
allNull = false;
builder.write(column, i);
+ break;
}
}
if (allNull) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
index fdcf49a9142..13652a7c9c7 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/JoinTest.java
@@ -385,15 +385,7 @@ public class JoinTest {
"SELECT * FROM table1 t1 INNER JOIN table1 t2 USING(tag1, time)",
ONLY_SUPPORT_TIME_COLUMN_IN_USING_CLAUSE);
- // FULL, LEFT, RIGHT JOIN
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 FULL JOIN table1 t2 ON t1.time=t2.time",
- "FULL JOIN is not supported, only support INNER JOIN in current
version");
-
- assertAnalyzeSemanticException(
- "SELECT * FROM table1 t1 FULL JOIN table1 t2 ON t1.time=t2.time WHERE
t1.time>1",
- "FULL JOIN is not supported, only support INNER JOIN in current
version");
-
+ // LEFT, RIGHT JOIN
assertAnalyzeSemanticException(
"SELECT * FROM table1 t1 LEFT JOIN table1 t2 ON t1.time=t2.time",
"LEFT JOIN is not supported, only support INNER JOIN in current
version");