This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 780db8361f [IOTDB-2849] Delete groupByLevelOperator and add more
comments to timeJoinOperator (#5614)
780db8361f is described below
commit 780db8361f0d5c87ec372ef574318e07b0d33016
Author: Xiangwei Wei <[email protected]>
AuthorDate: Tue May 3 20:19:14 2022 +0800
[IOTDB-2849] Delete groupByLevelOperator and add more comments to
timeJoinOperator (#5614)
---
.../operator/process/GroupByLevelOperator.java | 57 ----------------------
.../operator/process/TimeJoinOperator.java | 44 +++++++++++------
.../iotdb/tsfile/read/common/block/TsBlock.java | 1 +
3 files changed, 29 insertions(+), 73 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/GroupByLevelOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/GroupByLevelOperator.java
deleted file mode 100644
index 90904142fa..0000000000
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/GroupByLevelOperator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.execution.operator.process;
-
-import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class GroupByLevelOperator implements ProcessOperator {
-
- @Override
- public OperatorContext getOperatorContext() {
- return null;
- }
-
- @Override
- public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
- }
-
- @Override
- public TsBlock next() {
- return null;
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
-
- @Override
- public void close() throws Exception {
- ProcessOperator.super.close();
- }
-
- @Override
- public boolean isFinished() {
- return false;
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
index bba5f5ce29..0fe2bb2873 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TimeJoinOperator.java
@@ -41,8 +41,9 @@ public class TimeJoinOperator implements ProcessOperator {
private final List<Operator> children;
- private final int inputCount;
+ private final int inputOperatorsCount;
+ /** TsBlock from child operator. Only one cache now. */
private final TsBlock[] inputTsBlocks;
/** start index for each input TsBlocks and size of it is equal to
inputTsBlocks */
@@ -51,15 +52,20 @@ public class TimeJoinOperator implements ProcessOperator {
/** used to record current index for input TsBlocks after merging */
private final int[] shadowInputIndex;
+ /**
+ * Represent whether there are more tsBlocks from ith child operator. If all
elements in
+ * noMoreTsBlocks[] are true and inputTsBlocks[] are consumed completely,
this operator is
+ * finished.
+ */
private final boolean[] noMoreTsBlocks;
private final TimeSelector timeSelector;
- private final int columnCount;
+ private final int outputColumnCount;
/**
* this field indicates each data type for output columns(not including time
column) of
- * TimeJoinOperator its size should be equal to columnCount
+ * TimeJoinOperator its size should be equal to outputColumnCount
*/
private final List<TSDataType> dataTypes;
@@ -83,13 +89,14 @@ public class TimeJoinOperator implements ProcessOperator {
"child size of TimeJoinOperator should be larger than 0");
this.operatorContext = operatorContext;
this.children = children;
- this.inputCount = children.size();
- this.inputTsBlocks = new TsBlock[this.inputCount];
- this.inputIndex = new int[this.inputCount];
- this.shadowInputIndex = new int[this.inputCount];
- this.noMoreTsBlocks = new boolean[this.inputCount];
- this.timeSelector = new TimeSelector(this.inputCount << 1,
OrderBy.TIMESTAMP_ASC == mergeOrder);
- this.columnCount = dataTypes.size();
+ this.inputOperatorsCount = children.size();
+ this.inputTsBlocks = new TsBlock[this.inputOperatorsCount];
+ this.inputIndex = new int[this.inputOperatorsCount];
+ this.shadowInputIndex = new int[this.inputOperatorsCount];
+ this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
+ this.timeSelector =
+ new TimeSelector(this.inputOperatorsCount << 1, OrderBy.TIMESTAMP_ASC
== mergeOrder);
+ this.outputColumnCount = dataTypes.size();
this.dataTypes = dataTypes;
this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
this.mergers = mergers;
@@ -103,7 +110,7 @@ public class TimeJoinOperator implements ProcessOperator {
@Override
public ListenableFuture<Void> isBlocked() {
- for (int i = 0; i < inputCount; i++) {
+ for (int i = 0; i < inputOperatorsCount; i++) {
if (!noMoreTsBlocks[i] && empty(i)) {
ListenableFuture<Void> blocked = children.get(i).isBlocked();
if (!blocked.isDone()) {
@@ -121,7 +128,7 @@ public class TimeJoinOperator implements ProcessOperator {
// TsBlocks order by asc/desc
long currentEndTime = 0;
boolean init = false;
- for (int i = 0; i < inputCount; i++) {
+ for (int i = 0; i < inputOperatorsCount; i++) {
if (!noMoreTsBlocks[i] && empty(i) && children.get(i).hasNext()) {
inputIndex[i] = 0;
inputTsBlocks[i] = children.get(i).next();
@@ -154,7 +161,7 @@ public class TimeJoinOperator implements ProcessOperator {
tsBlockBuilder.declarePosition();
}
- for (int i = 0; i < columnCount; i++) {
+ for (int i = 0; i < outputColumnCount; i++) {
ColumnMerger merger = mergers.get(i);
merger.mergeColumn(
inputTsBlocks,
@@ -166,7 +173,7 @@ public class TimeJoinOperator implements ProcessOperator {
}
// update inputIndex using shadowInputIndex
- System.arraycopy(shadowInputIndex, 0, inputIndex, 0, inputCount);
+ System.arraycopy(shadowInputIndex, 0, inputIndex, 0, inputOperatorsCount);
return tsBlockBuilder.build();
}
@@ -176,7 +183,7 @@ public class TimeJoinOperator implements ProcessOperator {
if (finished) {
return false;
}
- for (int i = 0; i < inputCount; i++) {
+ for (int i = 0; i < inputOperatorsCount; i++) {
if (!empty(i)) {
return true;
} else if (!noMoreTsBlocks[i]) {
@@ -204,7 +211,8 @@ public class TimeJoinOperator implements ProcessOperator {
return true;
}
finished = true;
- for (int i = 0; i < inputCount; i++) {
+
+ for (int i = 0; i < inputOperatorsCount; i++) {
// has more tsBlock output from children[i] or has cached tsBlock in
inputTsBlocks[i]
if (!noMoreTsBlocks[i] || !empty(i)) {
finished = false;
@@ -214,6 +222,10 @@ public class TimeJoinOperator implements ProcessOperator {
return finished;
}
+ /**
+ * If the tsBlock of columnIndex is null or has no more data in the tsBlock,
return true; else
+ * return false;
+ */
private boolean empty(int columnIndex) {
return inputTsBlocks[columnIndex] == null
|| inputTsBlocks[columnIndex].getPositionCount() ==
inputIndex[columnIndex];
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index d3ffe65ff3..8034bfb189 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -59,6 +59,7 @@ public class TsBlock {
private final Column[] valueColumns;
+ /** How many rows in current TsBlock */
private final int positionCount;
private volatile long retainedSizeInBytes = -1;