This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/sonar by this push:
new 206468e4fba
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join
206468e4fba is described below
commit 206468e4fbadc1f418c4b499a382cf9e05096898
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 21 10:01:17 2023 +0800
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join
---
.../process/join/HorizontallyConcatOperator.java | 11 +-
.../process/join/RowBasedTimeJoinOperator.java | 66 ++--
.../operator/process/join/TimeJoinOperator.java | 292 ---------------
.../process/join/merge/AscTimeComparator.java | 3 +-
.../operator/process/join/merge/ColumnMerger.java | 9 +-
.../process/join/merge/DescTimeComparator.java | 3 +-
.../process/join/merge/MergeSortComparator.java | 10 +-
.../process/join/merge/MultiColumnMerger.java | 12 +-
.../join/merge/NonOverlappedMultiColumnMerger.java | 5 +-
.../process/join/merge/SingleColumnMerger.java | 26 +-
.../process/join/merge/SortKeyComparator.java | 9 +-
.../process/join/merge/TimeComparator.java | 7 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 10 +-
.../execution/operator/TimeJoinOperatorTest.java | 414 ---------------------
14 files changed, 100 insertions(+), 777 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
index 2bb083b9db4..2c4c6e30828 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/HorizontallyConcatOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
@@ -42,7 +43,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
*/
public class HorizontallyConcatOperator extends AbstractConsumeAllOperator {
- /** start index for each input TsBlocks and size of it is equal to
inputTsBlocks */
+ /** start index for each input TsBlocks and size of it is equal to
inputTsBlocks. */
private final int[] inputIndex;
private final TsBlockBuilder tsBlockBuilder;
@@ -116,10 +117,11 @@ public class HorizontallyConcatOperator extends
AbstractConsumeAllOperator {
if (finished) {
return true;
}
- return finished =
+ finished =
isEmpty(readyChildIndex)
&& (children.get(readyChildIndex) == null
|| !children.get(readyChildIndex).hasNextWithTimer());
+ return finished;
}
@Override
@@ -144,7 +146,8 @@ public class HorizontallyConcatOperator extends
AbstractConsumeAllOperator {
@Override
public long calculateRetainedSizeAfterCallingNext() {
- long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ long currentRetainedSize = 0;
+ long minChildReturnSize = Long.MAX_VALUE;
for (Operator child : children) {
long maxReturnSize = child.calculateMaxReturnSize();
currentRetainedSize += (maxReturnSize +
child.calculateRetainedSizeAfterCallingNext());
@@ -156,7 +159,7 @@ public class HorizontallyConcatOperator extends
AbstractConsumeAllOperator {
/**
* If the tsBlock of tsBlockIndex is null or has no more data in the
tsBlock, return true; else
- * return false;
+ * return false.
*/
@Override
protected boolean isEmpty(int tsBlockIndex) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 1912d04ce4b..d5258682931 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -42,10 +43,10 @@ import static
com.google.common.util.concurrent.Futures.successfulAsList;
public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator {
- /** start index for each input TsBlocks and size of it is equal to
inputTsBlocks */
+ /** start index for each input TsBlocks and size of it is equal to
inputTsBlocks. */
private final int[] inputIndex;
- /** used to record current index for input TsBlocks after merging */
+ /** used to record current index for input TsBlocks after merging. */
private final int[] shadowInputIndex;
/**
@@ -61,7 +62,7 @@ public class RowBasedTimeJoinOperator extends
AbstractConsumeAllOperator {
/**
* this field indicates each data type for output columns(not including time
column) of
- * TimeJoinOperator its size should be equal to outputColumnCount
+ * TimeJoinOperator its size should be equal to outputColumnCount.
*/
private final List<TSDataType> dataTypes;
@@ -149,8 +150,8 @@ public class RowBasedTimeJoinOperator extends
AbstractConsumeAllOperator {
if (timeSelector.isEmpty()) {
// return empty TsBlock
- TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(0, dataTypes);
- return tsBlockBuilder.build();
+ TsBlockBuilder emptyTsBlockBuilder = new TsBlockBuilder(0, dataTypes);
+ return emptyTsBlockBuilder.build();
}
TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
@@ -158,31 +159,41 @@ public class RowBasedTimeJoinOperator extends
AbstractConsumeAllOperator {
do {
currentTime = timeSelector.pollFirst();
timeBuilder.writeLong(currentTime);
- for (int i = 0; i < outputColumnCount; i++) {
- ColumnMerger merger = mergers.get(i);
- merger.mergeColumn(
- inputTsBlocks,
- inputIndex,
- shadowInputIndex,
- currentTime,
- tsBlockBuilder.getColumnBuilder(i));
- }
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (inputIndex[i] != shadowInputIndex[i]) {
- inputIndex[i] = shadowInputIndex[i];
- if (!isEmpty(i)) {
- updateTimeSelector(i);
- }
- }
- }
+ appendOneRow(currentTime);
tsBlockBuilder.declarePosition();
+
+ prepareForTimeHeap();
+
} while (comparator.canContinue(currentTime, currentEndTime) &&
!timeSelector.isEmpty());
resultTsBlock = tsBlockBuilder.build();
return checkTsBlockSizeAndGetResult();
}
+ private void appendOneRow(long currentTime) {
+ for (int i = 0; i < outputColumnCount; i++) {
+ ColumnMerger merger = mergers.get(i);
+ merger.mergeColumn(
+ inputTsBlocks,
+ inputIndex,
+ shadowInputIndex,
+ currentTime,
+ tsBlockBuilder.getColumnBuilder(i));
+ }
+ }
+
+ private void prepareForTimeHeap() {
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (inputIndex[i] != shadowInputIndex[i]) {
+ inputIndex[i] = shadowInputIndex[i];
+ if (!isEmpty(i)) {
+ updateTimeSelector(i);
+ }
+ }
+ }
+ }
+
@Override
public boolean hasNext() throws Exception {
if (finished) {
@@ -250,7 +261,8 @@ public class RowBasedTimeJoinOperator extends
AbstractConsumeAllOperator {
@Override
public long calculateRetainedSizeAfterCallingNext() {
- long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ long currentRetainedSize = 0;
+ long minChildReturnSize = Long.MAX_VALUE;
for (Operator child : children) {
long maxReturnSize = child.calculateMaxReturnSize();
currentRetainedSize += (maxReturnSize +
child.calculateRetainedSizeAfterCallingNext());
@@ -274,7 +286,7 @@ public class RowBasedTimeJoinOperator extends
AbstractConsumeAllOperator {
protected boolean prepareInput() throws Exception {
boolean allReady = true;
for (int i = 0; i < inputOperatorsCount; i++) {
- if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
+ if (needCallNext(i)) {
continue;
}
if (canCallNext[i]) {
@@ -300,6 +312,10 @@ public class RowBasedTimeJoinOperator extends
AbstractConsumeAllOperator {
return allReady;
}
+ private boolean needCallNext(int i) {
+ return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+ }
+
@TestOnly
public List<Operator> getChildren() {
return children;
@@ -313,7 +329,7 @@ public class RowBasedTimeJoinOperator extends
AbstractConsumeAllOperator {
/**
* If the tsBlock of columnIndex is null or has no more data in the tsBlock,
return true; else
- * return false;
+ * return false.
*/
@Override
protected boolean isEmpty(int columnIndex) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
deleted file mode 100644
index 2b60ad6d3db..00000000000
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/TimeJoinOperator.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.execution.operator.process.join;
-
-import org.apache.iotdb.db.mpp.execution.operator.AbstractOperator;
-import org.apache.iotdb.db.mpp.execution.operator.Operator;
-import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.TimeComparator;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.util.concurrent.Futures.successfulAsList;
-
-@Deprecated
-public class TimeJoinOperator extends AbstractOperator {
-
- private final List<Operator> children;
-
- private final int inputOperatorsCount;
-
- /** TsBlock from child operator. Only one cache now. */
- private final TsBlock[] inputTsBlocks;
-
- /** start index for each input TsBlocks and size of it is equal to
inputTsBlocks */
- private final int[] inputIndex;
-
- /** used to record current index for input TsBlocks after merging */
- private final int[] shadowInputIndex;
-
- /**
- * Represent whether there are more tsBlocks from ith child operator. If all
elements in
- * noMoreTsBlocks[] are true and inputTsBlocks[] are consumed completely,
this operator is
- * finished.
- */
- private final boolean[] noMoreTsBlocks;
-
- private final TimeSelector timeSelector;
-
- private final int outputColumnCount;
-
- /**
- * this field indicates each data type for output columns(not including time
column) of
- * TimeJoinOperator its size should be equal to outputColumnCount
- */
- private final List<TSDataType> dataTypes;
-
- private final List<ColumnMerger> mergers;
-
- private final TsBlockBuilder tsBlockBuilder;
-
- private boolean finished;
-
- private final TimeComparator comparator;
-
- public TimeJoinOperator(
- OperatorContext operatorContext,
- List<Operator> children,
- Ordering mergeOrder,
- List<TSDataType> dataTypes,
- List<ColumnMerger> mergers,
- TimeComparator comparator) {
- checkArgument(
- children != null && !children.isEmpty(),
- "child size of TimeJoinOperator should be larger than 0");
- this.operatorContext = operatorContext;
- this.children = children;
- this.inputOperatorsCount = children.size();
- this.inputTsBlocks = new TsBlock[this.inputOperatorsCount];
- this.inputIndex = new int[this.inputOperatorsCount];
- this.shadowInputIndex = new int[this.inputOperatorsCount];
- this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
- this.timeSelector = new TimeSelector(this.inputOperatorsCount << 1,
Ordering.ASC == mergeOrder);
- this.outputColumnCount = dataTypes.size();
- this.dataTypes = dataTypes;
- this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
- this.mergers = mergers;
- this.comparator = comparator;
- }
-
- @Override
- public ListenableFuture<?> isBlocked() {
- List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && empty(i)) {
- ListenableFuture<?> blocked = children.get(i).isBlocked();
- if (!blocked.isDone()) {
- listenableFutures.add(blocked);
- }
- }
- }
- return listenableFutures.isEmpty() ? NOT_BLOCKED :
successfulAsList(listenableFutures);
- }
-
- @Override
- public TsBlock next() throws Exception {
- if (retainedTsBlock != null) {
- return getResultFromRetainedTsBlock();
- }
- tsBlockBuilder.reset();
- // end time for returned TsBlock this time, it's the min/max end time
among all the children
- // TsBlocks order by asc/desc
- long currentEndTime = 0;
- boolean init = false;
-
- // get TsBlock for each input, put their time stamp into TimeSelector and
then use the min Time
- // among all the input TsBlock as the current output TsBlock's endTime.
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (!noMoreTsBlocks[i] && empty(i)) {
- if (children.get(i).hasNextWithTimer()) {
- inputIndex[i] = 0;
- inputTsBlocks[i] = children.get(i).nextWithTimer();
- if (!empty(i)) {
- int rowSize = inputTsBlocks[i].getPositionCount();
- for (int row = 0; row < rowSize; row++) {
- timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
- }
- } else {
- // child operator has next but return an empty TsBlock which means
that it may not
- // finish calculation in given time slice.
- // In such case, TimeJoinOperator can't go on calculating, so we
just return null.
- // We can also use the while loop here to continuously call the
hasNext() and next()
- // methods of the child operator until its hasNext() returns false
or the next() gets
- // the data that is not empty, but this will cause the execution
time of the while
- // loop
- // to be uncontrollable and may exceed all allocated time slice
- return null;
- }
- } else { // no more tsBlock
- noMoreTsBlocks[i] = true;
- inputTsBlocks[i] = null;
- }
- }
- // update the currentEndTime if the TsBlock is not empty
- if (!empty(i)) {
- currentEndTime =
- init
- ? comparator.getCurrentEndTime(currentEndTime,
inputTsBlocks[i].getEndTime())
- : inputTsBlocks[i].getEndTime();
- init = true;
- }
- }
-
- if (timeSelector.isEmpty()) {
- // return empty TsBlock
- TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(0, dataTypes);
- return tsBlockBuilder.build();
- }
-
- TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
- while (!timeSelector.isEmpty()
- && comparator.satisfyCurEndTime(timeSelector.first(), currentEndTime))
{
- timeBuilder.writeLong(timeSelector.pollFirst());
- tsBlockBuilder.declarePosition();
- }
-
- for (int i = 0; i < outputColumnCount; i++) {
- ColumnMerger merger = mergers.get(i);
- merger.mergeColumn(
- inputTsBlocks,
- inputIndex,
- shadowInputIndex,
- timeBuilder,
- currentEndTime,
- tsBlockBuilder.getColumnBuilder(i));
- }
-
- // update inputIndex using shadowInputIndex
- System.arraycopy(shadowInputIndex, 0, inputIndex, 0, inputOperatorsCount);
-
- resultTsBlock = tsBlockBuilder.build();
- return checkTsBlockSizeAndGetResult();
- }
-
- @Override
- public boolean hasNext() throws Exception {
- if (finished) {
- return false;
- }
- if (retainedTsBlock != null) {
- return true;
- }
- for (int i = 0; i < inputOperatorsCount; i++) {
- if (!empty(i)) {
- return true;
- } else if (!noMoreTsBlocks[i]) {
- if (children.get(i).hasNextWithTimer()) {
- return true;
- } else {
- noMoreTsBlocks[i] = true;
- inputTsBlocks[i] = null;
- }
- }
- }
- return false;
- }
-
- @Override
- public void close() throws Exception {
- for (Operator child : children) {
- child.close();
- }
- }
-
- @Override
- public boolean isFinished() throws Exception {
- if (finished) {
- return true;
- }
- if (retainedTsBlock != null) {
- return false;
- }
-
- finished = true;
- for (int i = 0; i < inputOperatorsCount; i++) {
- // has more tsBlock output from children[i] or has cached tsBlock in
inputTsBlocks[i]
- if (!noMoreTsBlocks[i] || !empty(i)) {
- finished = false;
- break;
- }
- }
- return finished;
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- long maxPeekMemory = 0;
- long childrenMaxPeekMemory = 0;
- for (Operator child : children) {
- childrenMaxPeekMemory =
- Math.max(childrenMaxPeekMemory, maxPeekMemory +
child.calculateMaxPeekMemory());
- maxPeekMemory +=
- (child.calculateMaxReturnSize() +
child.calculateRetainedSizeAfterCallingNext());
- }
-
- maxPeekMemory += calculateMaxReturnSize();
- return Math.max(maxPeekMemory, childrenMaxPeekMemory);
- }
-
- @Override
- public long calculateMaxReturnSize() {
- // time + all value columns
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
- for (Operator child : children) {
- long maxReturnSize = child.calculateMaxReturnSize();
- currentRetainedSize += (maxReturnSize +
child.calculateRetainedSizeAfterCallingNext());
- minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
- }
- // max cached TsBlock
- return currentRetainedSize - minChildReturnSize;
- }
-
- /**
- * If the tsBlock of columnIndex is null or has no more data in the tsBlock,
return true; else
- * return false;
- */
- private boolean empty(int columnIndex) {
- return inputTsBlocks[columnIndex] == null
- || inputTsBlocks[columnIndex].getPositionCount() ==
inputIndex[columnIndex];
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
index e458b4aff38..4d24c857edb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/AscTimeComparator.java
@@ -16,11 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
public class AscTimeComparator implements TimeComparator {
- /** @return if order by time asc, return true if time <= endTime, otherwise
false */
+ /** return if order by time asc, return true if time <= endTime, otherwise
false. */
@Override
public boolean satisfyCurEndTime(long time, long endTime) {
return time <= endTime;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
index 7a00f2adf3b..38664d0cf5e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/ColumnMerger.java
@@ -16,16 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-/** used to merge columns belonging to same series into one column */
+/** used to merge columns belonging to same series into one column. */
public interface ColumnMerger {
/**
+ * judge whether TsBlock at tsBlockIndex is empty.
+ *
* @param tsBlockIndex index
* @param inputTsBlocks input TsBlock array
* @param inputIndex current index for each input TsBlock and size of it is
equal to inputTsBlocks
@@ -39,7 +42,7 @@ public interface ColumnMerger {
/**
* merge columns belonging to same series into one column, merge until each
input column's time is
- * larger than currentEndTime
+ * larger than currentEndTime.
*
* @param inputTsBlocks all source TsBlocks, some of which will contain
source column
* @param inputIndex start index for each source TsBlock and size of it is
equal to inputTsBlocks,
@@ -61,7 +64,7 @@ public interface ColumnMerger {
/**
* merge columns belonging to same series into one column, merge just one
row whose time is equal
- * to currentTime
+ * to currentTime.
*
* @param inputTsBlocks all source TsBlocks, some of which will contain
source column
* @param inputIndex start index for each source TsBlock and size of it is
equal to inputTsBlocks,
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
index e13661823d9..5b1a097c503 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/DescTimeComparator.java
@@ -16,11 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
public class DescTimeComparator implements TimeComparator {
- /** @return if order by time desc, return true if time >= endTime, otherwise
false */
+ /** return if order by time desc, return true if time >= endTime, otherwise
false. */
@Override
public boolean satisfyCurEndTime(long time, long endTime) {
return time >= endTime;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
index c949c2990d0..efad0370c5d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
@@ -42,7 +42,11 @@ public class MergeSortComparator {
(SortKey sortKey) ->
sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex))
.reversed();
- /** @param indexList -1 for time column */
+ private MergeSortComparator() {
+ // util class doesn't need constructor
+ }
+
+ /** -1 in index is for time column. */
public static Comparator<SortKey> getComparator(
List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType>
dataTypeList) {
@@ -50,7 +54,9 @@ public class MergeSortComparator {
List<Comparator<SortKey>> list = new ArrayList<>(indexList.size());
for (int i = 0; i < indexList.size(); i++) {
int index = indexList.get(i);
- if (index == -2) continue;
+ if (index == -2) {
+ continue;
+ }
TSDataType dataType = dataTypeList.get(i);
boolean asc = sortItemList.get(i).getOrdering() == Ordering.ASC;
boolean nullFirst = sortItemList.get(i).getNullOrdering() ==
NullOrdering.FIRST;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
index 65943460341..ad98076ff8d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MultiColumnMerger.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -27,7 +28,7 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
-/** has more than one input column, but these columns' time is overlapped */
+/** has more than one input column, but these columns' time is overlapped. */
public class MultiColumnMerger implements ColumnMerger {
private final List<InputLocation> inputLocations;
@@ -36,6 +37,7 @@ public class MultiColumnMerger implements ColumnMerger {
this.inputLocations = inputLocations;
}
+ @SuppressWarnings("squid:S3776")
@Override
public void mergeColumn(
TsBlock[] inputTsBlocks,
@@ -128,7 +130,6 @@ public class MultiColumnMerger implements ColumnMerger {
// value of current location's input column is not null
// here we only append value if there is no value appended before
and current value is
// null
- // TODO That means we choose first value as the final value if there
exist timestamp
// belonging to more than one DataRegion, we need to choose which
one is latest
if (!appendValue && !valueColumn.isNull(index)) {
columnBuilder.write(valueColumn, index);
@@ -138,13 +139,6 @@ public class MultiColumnMerger implements ColumnMerger {
index++;
// update the index after merging
updatedInputIndex[tsBlockIndex] = index;
- // we can never safely set appendValue to true and then break the
loop, because these
- // input
- // columns' time may be overlapped, we should increase each column's
index whose time is
- // equal to currentTime
- // if (appendValue) {
- // break;
- // }
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
index c338130b8a7..da55cd78151 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/NonOverlappedMultiColumnMerger.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -25,7 +26,7 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
-/** has more than one input column, but these columns' time is not overlapped
*/
+/** has more than one input column, but these columns' time is not overlapped.
*/
public class NonOverlappedMultiColumnMerger implements ColumnMerger {
private final List<InputLocation> inputLocations;
@@ -36,7 +37,7 @@ public class NonOverlappedMultiColumnMerger implements
ColumnMerger {
private int index;
/**
- * these columns' time should never be overlapped
+ * these columns' time should never be overlapped.
*
* @param inputLocations The time order in TsBlock represented by
inputLocations should be
* incremented by timestamp if it is order by time asc, otherwise
decreased by timestamp if it
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
index 0c9f7e60e71..0b12a890dc8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SingleColumnMerger.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -25,7 +26,7 @@ import
org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-/** only has one input column */
+/** only has one input column. */
public class SingleColumnMerger implements ColumnMerger {
private final InputLocation location;
@@ -57,6 +58,18 @@ public class SingleColumnMerger implements ColumnMerger {
comparator);
}
+ @Override
+ public void mergeColumn(
+ TsBlock[] inputTsBlocks,
+ int[] inputIndex,
+ int[] updatedInputIndex,
+ long currentTime,
+ ColumnBuilder columnBuilder) {
+ mergeOneColumn(
+ inputTsBlocks, inputIndex, updatedInputIndex, currentTime,
columnBuilder, location);
+ }
+
+ @SuppressWarnings({"squid:S107", "squid:S3776"})
public static void mergeOneColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
@@ -111,17 +124,6 @@ public class SingleColumnMerger implements ColumnMerger {
updatedInputIndex[tsBlockIndex] = index;
}
- @Override
- public void mergeColumn(
- TsBlock[] inputTsBlocks,
- int[] inputIndex,
- int[] updatedInputIndex,
- long currentTime,
- ColumnBuilder columnBuilder) {
- mergeOneColumn(
- inputTsBlocks, inputIndex, updatedInputIndex, currentTime,
columnBuilder, location);
- }
-
public static void mergeOneColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
index fa4a6a1a011..a8f4384e14d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/SortKeyComparator.java
@@ -21,10 +21,9 @@ package
org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
import org.apache.iotdb.db.utils.datastructure.SortKey;
-import java.io.Serializable;
import java.util.Comparator;
-public class SortKeyComparator implements Comparator<SortKey>, Serializable {
+public class SortKeyComparator implements Comparator<SortKey> {
private final boolean nullFirst;
private final int index;
@@ -44,7 +43,11 @@ public class SortKeyComparator implements
Comparator<SortKey>, Serializable {
if (!o1IsNull && !o2IsNull) {
return originalComparator.compare(o1, o2);
} else if (o1IsNull) {
- return o2IsNull ? 0 : (nullFirst ? -1 : 1);
+ if (o2IsNull) {
+ return 0;
+ } else {
+ return nullFirst ? -1 : 1;
+ }
} else {
return nullFirst ? 1 : -1;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
index 846af9810db..d71678b7067 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/TimeComparator.java
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
public interface TimeComparator {
- /** @return true if time is satisfied with endTime, otherwise false */
+ /** return true if time is satisfied with endTime, otherwise false. */
boolean satisfyCurEndTime(long time, long endTime);
- /** @return min(time1, time2) if order by time asc, max(time1, time2) if
order by desc */
+ /** return min(time1, time2) if order by time asc, max(time1, time2) if
order by desc. */
long getCurrentEndTime(long time1, long time2);
- /** @return time < endTime if order by time asc, time > endTime if order by
desc */
+ /** return time < endTime if order by time asc, time > endTime if order by
desc. */
boolean canContinue(long time, long endTime);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 0ebf65484be..cd45d7138d2 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -84,7 +84,6 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.IntPrevi
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.LongPreviousFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.join.HorizontallyConcatOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
@@ -224,7 +223,6 @@ import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.Validate;
import java.io.File;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1045,7 +1043,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getZoneId(),
expressionTypes,
node.getScanOrder() == Ordering.ASC);
- } catch (QueryProcessException | IOException e) {
+ } catch (QueryProcessException e) {
throw new RuntimeException(e);
}
}
@@ -1087,7 +1085,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
}
- // init UDTFContext;
+ // init UDTFContext
UDTFContext filterContext = new UDTFContext(node.getZoneId());
filterContext.constructUdfExecutors(new Expression[] {filterExpression});
@@ -1186,7 +1184,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
node.getZoneId(),
expressionTypes,
node.getScanOrder() == Ordering.ASC);
- } catch (QueryProcessException | IOException e) {
+ } catch (QueryProcessException e) {
throw new RuntimeException(e);
}
}
@@ -1762,7 +1760,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- TimeJoinOperator.class.getSimpleName());
+ RowBasedTimeJoinOperator.class.getSimpleName());
TimeComparator timeComparator =
node.getMergeOrder() == Ordering.ASC ? ASC_TIME_COMPARATOR :
DESC_TIME_COMPARATOR;
List<OutputColumn> outputColumns = generateOutputColumnsFromChildren(node);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
deleted file mode 100644
index ef122f78b87..00000000000
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/TimeJoinOperatorTest.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.execution.operator;
-
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
-import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinOperator;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
-import
org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
-import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
-import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
-import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
-import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import io.airlift.units.Duration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TimeJoinOperatorTest {
- private static final String TIME_JOIN_OPERATOR_TEST_SG =
"root.TimeJoinOperatorTest";
- private final List<String> deviceIds = new ArrayList<>();
- private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
-
- private final List<TsFileResource> seqResources = new ArrayList<>();
- private final List<TsFileResource> unSeqResources = new ArrayList<>();
-
- @Before
- public void setUp() throws MetadataException, IOException,
WriteProcessException {
- SeriesReaderTestUtil.setUp(
- measurementSchemas, deviceIds, seqResources, unSeqResources,
TIME_JOIN_OPERATOR_TEST_SG);
- }
-
- @After
- public void tearDown() throws IOException {
- SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
- }
-
- @Test
- public void batchTest1() throws Exception {
- ExecutorService instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
- try {
- MeasurementPath measurementPath1 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
- Set<String> allSensors = new HashSet<>();
- allSensors.add("sensor0");
- allSensors.add("sensor1");
- QueryId queryId = new QueryId("stub_query");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
- FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(instanceId, stateMachine);
- DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
- PlanNodeId planNodeId1 = new PlanNodeId("1");
- driverContext.addOperatorContext(1, planNodeId1,
SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId2 = new PlanNodeId("2");
- driverContext.addOperatorContext(2, planNodeId2,
SeriesScanOperator.class.getSimpleName());
- driverContext.addOperatorContext(
- 3, new PlanNodeId("3"),
RowBasedTimeJoinOperator.class.getSimpleName());
-
- SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
- scanOptionsBuilder.withAllSensors(allSensors);
- SeriesScanOperator seriesScanOperator1 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(0),
- planNodeId1,
- measurementPath1,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator1
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- MeasurementPath measurementPath2 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1",
TSDataType.INT32);
- SeriesScanOperator seriesScanOperator2 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(1),
- planNodeId2,
- measurementPath2,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator2
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- TimeJoinOperator timeJoinOperator =
- new TimeJoinOperator(
- driverContext.getOperatorContexts().get(2),
- Arrays.asList(seriesScanOperator1, seriesScanOperator2),
- Ordering.ASC,
- Arrays.asList(TSDataType.INT32, TSDataType.INT32),
- Arrays.asList(
- new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
- new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator())),
- new AscTimeComparator());
- int count = 0;
- while (timeJoinOperator.hasNext()) {
- TsBlock tsBlock = timeJoinOperator.next();
- assertEquals(2, tsBlock.getValueColumnCount());
- assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
- assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
- for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
- assertEquals(count, tsBlock.getTimeByIndex(i));
- if ((long) count < 200) {
- assertEquals(20000 + (long) count, tsBlock.getColumn(0).getInt(i));
- assertEquals(20000 + (long) count, tsBlock.getColumn(1).getInt(i));
- } else if ((long) count < 260
- || ((long) count >= 300 && (long) count < 380)
- || (long) count >= 400) {
- assertEquals(10000 + (long) count, tsBlock.getColumn(0).getInt(i));
- assertEquals(10000 + (long) count, tsBlock.getColumn(1).getInt(i));
- } else {
- assertEquals(count, tsBlock.getColumn(0).getInt(i));
- assertEquals(count, tsBlock.getColumn(1).getInt(i));
- }
- }
- }
- assertEquals(500, count);
- } catch (IllegalPathException e) {
- e.printStackTrace();
- fail();
- } finally {
- instanceNotificationExecutor.shutdown();
- }
- }
-
- /** test time join with non-exist sensor */
- @Test
- public void batchTest2() throws Exception {
- ExecutorService instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
- try {
- MeasurementPath measurementPath1 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
- Set<String> allSensors = new HashSet<>();
- allSensors.add("sensor0");
- allSensors.add("sensor1");
- allSensors.add("error_sensor");
- QueryId queryId = new QueryId("stub_query");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
- FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(instanceId, stateMachine);
- DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
- PlanNodeId planNodeId1 = new PlanNodeId("1");
- driverContext.addOperatorContext(1, planNodeId1,
SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId2 = new PlanNodeId("2");
- driverContext.addOperatorContext(2, planNodeId2,
SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId3 = new PlanNodeId("3");
- driverContext.addOperatorContext(3, planNodeId3,
SeriesScanOperator.class.getSimpleName());
- driverContext.addOperatorContext(
- 4, new PlanNodeId("4"), TimeJoinOperator.class.getSimpleName());
-
- SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
- scanOptionsBuilder.withAllSensors(allSensors);
- SeriesScanOperator seriesScanOperator1 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(0),
- planNodeId1,
- measurementPath1,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator1
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- MeasurementPath measurementPath2 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1",
TSDataType.INT32);
- SeriesScanOperator seriesScanOperator2 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(1),
- planNodeId2,
- measurementPath2,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator2
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- MeasurementPath measurementPath3 =
- new MeasurementPath(
- TIME_JOIN_OPERATOR_TEST_SG + ".device0.error_sensor",
TSDataType.INT32);
- SeriesScanOperator seriesScanOperator3 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(2),
- planNodeId3,
- measurementPath3,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator3.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator3
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- TimeJoinOperator timeJoinOperator =
- new TimeJoinOperator(
- driverContext.getOperatorContexts().get(3),
- Arrays.asList(seriesScanOperator1, seriesScanOperator2,
seriesScanOperator3),
- Ordering.ASC,
- Arrays.asList(TSDataType.INT32, TSDataType.INT32,
TSDataType.INT32),
- Arrays.asList(
- new SingleColumnMerger(new InputLocation(0, 0), new
AscTimeComparator()),
- new SingleColumnMerger(new InputLocation(1, 0), new
AscTimeComparator()),
- new SingleColumnMerger(new InputLocation(2, 0), new
AscTimeComparator())),
- new AscTimeComparator());
- int count = 0;
- while (timeJoinOperator.hasNext()) {
- TsBlock tsBlock = timeJoinOperator.next();
- assertEquals(3, tsBlock.getValueColumnCount());
- assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
- assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
- assertTrue(tsBlock.getColumn(2) instanceof RunLengthEncodedColumn);
- for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
- assertEquals(count, tsBlock.getTimeByIndex(i));
- assertTrue(tsBlock.getColumn(2).isNull(i));
- if ((long) count < 200) {
- assertEquals(20000 + (long) count, tsBlock.getColumn(0).getInt(i));
- assertEquals(20000 + (long) count, tsBlock.getColumn(1).getInt(i));
- } else if ((long) count < 260
- || ((long) count >= 300 && (long) count < 380)
- || (long) count >= 400) {
- assertEquals(10000 + (long) count, tsBlock.getColumn(0).getInt(i));
- assertEquals(10000 + (long) count, tsBlock.getColumn(1).getInt(i));
- } else {
- assertEquals(count, tsBlock.getColumn(0).getInt(i));
- assertEquals(count, tsBlock.getColumn(1).getInt(i));
- }
- }
- }
- assertEquals(500, count);
- } catch (IllegalPathException e) {
- e.printStackTrace();
- fail();
- } finally {
- instanceNotificationExecutor.shutdown();
- }
- }
-
- /** test time join with non-exist sensor and order by time desc */
- @Test
- public void batchTest3() throws Exception {
- ExecutorService instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
- try {
- MeasurementPath measurementPath1 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
- Set<String> allSensors = new HashSet<>();
- allSensors.add("sensor0");
- allSensors.add("sensor1");
- allSensors.add("error_sensor");
- QueryId queryId = new QueryId("stub_query");
- FragmentInstanceId instanceId =
- new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
- FragmentInstanceStateMachine stateMachine =
- new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
- FragmentInstanceContext fragmentInstanceContext =
- createFragmentInstanceContext(instanceId, stateMachine);
- DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
- PlanNodeId planNodeId1 = new PlanNodeId("1");
- driverContext.addOperatorContext(1, planNodeId1,
SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId2 = new PlanNodeId("2");
- driverContext.addOperatorContext(2, planNodeId2,
SeriesScanOperator.class.getSimpleName());
- PlanNodeId planNodeId3 = new PlanNodeId("3");
- driverContext.addOperatorContext(3, planNodeId3,
SeriesScanOperator.class.getSimpleName());
- driverContext.addOperatorContext(
- 4, new PlanNodeId("4"), TimeJoinOperator.class.getSimpleName());
-
- SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
- scanOptionsBuilder.withAllSensors(allSensors);
- SeriesScanOperator seriesScanOperator1 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(0),
- planNodeId1,
- measurementPath1,
- Ordering.DESC,
- scanOptionsBuilder.build());
- seriesScanOperator1.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator1
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- MeasurementPath measurementPath2 =
- new MeasurementPath(TIME_JOIN_OPERATOR_TEST_SG + ".device0.sensor1",
TSDataType.INT32);
- SeriesScanOperator seriesScanOperator2 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(1),
- planNodeId2,
- measurementPath2,
- Ordering.DESC,
- scanOptionsBuilder.build());
- seriesScanOperator2.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator2
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- MeasurementPath measurementPath3 =
- new MeasurementPath(
- TIME_JOIN_OPERATOR_TEST_SG + ".device0.error_sensor",
TSDataType.INT32);
- SeriesScanOperator seriesScanOperator3 =
- new SeriesScanOperator(
- driverContext.getOperatorContexts().get(2),
- planNodeId3,
- measurementPath3,
- Ordering.ASC,
- scanOptionsBuilder.build());
- seriesScanOperator3.initQueryDataSource(new
QueryDataSource(seqResources, unSeqResources));
- seriesScanOperator3
- .getOperatorContext()
- .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
-
- TimeJoinOperator timeJoinOperator =
- new TimeJoinOperator(
- driverContext.getOperatorContexts().get(3),
- Arrays.asList(seriesScanOperator1, seriesScanOperator2,
seriesScanOperator3),
- Ordering.DESC,
- Arrays.asList(TSDataType.INT32, TSDataType.INT32,
TSDataType.INT32),
- Arrays.asList(
- new SingleColumnMerger(new InputLocation(0, 0), new
DescTimeComparator()),
- new SingleColumnMerger(new InputLocation(1, 0), new
DescTimeComparator()),
- new SingleColumnMerger(new InputLocation(2, 0), new
DescTimeComparator())),
- new DescTimeComparator());
- int count = 499;
- while (timeJoinOperator.hasNext()) {
- TsBlock tsBlock = timeJoinOperator.next();
- assertEquals(3, tsBlock.getValueColumnCount());
- assertTrue(tsBlock.getColumn(0) instanceof IntColumn);
- assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
- assertTrue(tsBlock.getColumn(2) instanceof RunLengthEncodedColumn);
- for (int i = 0; i < tsBlock.getPositionCount(); i++, count--) {
- assertEquals(count, tsBlock.getTimeByIndex(i));
- assertTrue(tsBlock.getColumn(2).isNull(i));
- if ((long) count < 200) {
- assertEquals(20000 + (long) count, tsBlock.getColumn(0).getInt(i));
- assertEquals(20000 + (long) count, tsBlock.getColumn(1).getInt(i));
- } else if ((long) count < 260
- || ((long) count >= 300 && (long) count < 380)
- || (long) count >= 400) {
- assertEquals(10000 + (long) count, tsBlock.getColumn(0).getInt(i));
- assertEquals(10000 + (long) count, tsBlock.getColumn(1).getInt(i));
- } else {
- assertEquals(count, tsBlock.getColumn(0).getInt(i));
- assertEquals(count, tsBlock.getColumn(1).getInt(i));
- }
- }
- }
- assertEquals(-1, count);
- } catch (IllegalPathException e) {
- e.printStackTrace();
- fail();
- } finally {
- instanceNotificationExecutor.shutdown();
- }
- }
-}