This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-3722
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-3722 by this push:
new e59d77b49c fix bug
e59d77b49c is described below
commit e59d77b49c0cc2ed3ae1882fcbf6f353dadbfb2c
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jul 5 23:03:50 2022 +0800
fix bug
---
.../iotdb/db/it/query/IoTDBNullValueFillIT.java | 2 -
.../operator/process/LinearFillOperator.java | 30 +++++++--
.../operator/process/fill/ILinearFill.java | 13 ++--
.../process/fill/identity/IdentityLinearFill.java | 7 +-
.../process/fill/linear/DoubleLinearFill.java | 5 --
.../process/fill/linear/FloatLinearFill.java | 5 --
.../process/fill/linear/IntLinearFill.java | 5 --
.../operator/process/fill/linear/LinearFill.java | 75 ++++++++--------------
.../process/fill/linear/LongLinearFill.java | 5 --
.../operator/process/merge/AscTimeComparator.java | 5 --
.../operator/process/merge/DescTimeComparator.java | 5 --
.../operator/process/merge/TimeComparator.java | 5 --
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 15 ++---
.../execution/operator/LinearFillOperatorTest.java | 49 ++++++--------
14 files changed, 85 insertions(+), 141 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
index e70fb266a8..5437854454 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -226,7 +225,6 @@ public class IoTDBNullValueFillIT {
}
@Test
- @Ignore // TODO bug fix
public void linearFillAlignByDeviceTest() {
String[] retArray =
new String[] {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
index 125cda1eae..fcae23ce7c 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java
@@ -42,6 +42,10 @@ public class LinearFillOperator implements ProcessOperator {
private final int outputColumnCount;
// TODO need to spill it to disk if it consumes too much memory
private final List<TsBlock> cachedTsBlock;
+
+ private final List<Long> cachedRowIndex;
+
+ private long currentRowIndex = 0;
// next TsBlock Index for each Column
private final int[] nextTsBlockIndex;
@@ -60,6 +64,7 @@ public class LinearFillOperator implements ProcessOperator {
this.child = requireNonNull(child, "child operator is null");
this.outputColumnCount = fillArray.length;
this.cachedTsBlock = new ArrayList<>();
+ this.cachedRowIndex = new ArrayList<>();
this.nextTsBlockIndex = new int[outputColumnCount];
Arrays.fill(this.nextTsBlockIndex, 1);
this.canCallNext = false;
@@ -88,19 +93,21 @@ public class LinearFillOperator implements ProcessOperator {
return nextTsBlock;
} else { // otherwise, we cache it
cachedTsBlock.add(nextTsBlock);
+ cachedRowIndex.add(currentRowIndex);
+ currentRowIndex += nextTsBlock.getPositionCount();
}
}
TsBlock originTsBlock = cachedTsBlock.get(0);
- long currentEndTime = originTsBlock.getEndTime();
+ long currentEndRowIndex = cachedRowIndex.get(0) +
originTsBlock.getPositionCount() - 1;
// Step 1: judge whether we can fill current TsBlock, if TsBlock that we
can get is not enough,
// we just return null
for (int columnIndex = 0; columnIndex < outputColumnCount; columnIndex++) {
// current valueColumn can't be filled using current information
if (fillArray[columnIndex].needPrepareForNext(
- currentEndTime, originTsBlock.getColumn(columnIndex))) {
+ currentEndRowIndex, originTsBlock.getColumn(columnIndex))) {
// current cached TsBlock is not enough to fill this column
- while (!isCachedTsBlockEnough(columnIndex, currentEndTime)) {
+ while (!isCachedTsBlockEnough(columnIndex, currentEndRowIndex)) {
// if we failed to get next TsBlock
if (!tryToGetNextTsBlock()) {
// there is no more TsBlock, so we have to fill this Column
@@ -118,9 +125,12 @@ public class LinearFillOperator implements ProcessOperator
{
}
// Step 2: fill current TsBlock
originTsBlock = cachedTsBlock.remove(0);
+ long startRowIndex = cachedRowIndex.remove(0);
Column[] columns = new Column[outputColumnCount];
for (int i = 0; i < outputColumnCount; i++) {
- columns[i] = fillArray[i].fill(originTsBlock.getTimeColumn(),
originTsBlock.getColumn(i));
+ columns[i] =
+ fillArray[i].fill(
+ originTsBlock.getTimeColumn(), originTsBlock.getColumn(i),
startRowIndex);
}
TsBlock result =
new TsBlock(originTsBlock.getPositionCount(),
originTsBlock.getTimeColumn(), columns);
@@ -154,17 +164,21 @@ public class LinearFillOperator implements
ProcessOperator {
* Judge whether we can use current cached TsBlock to fill Column
*
* @param columnIndex index for column which need to be filled
- * @param currentEndTime endTime of column which need to be filled
+ * @param currentEndRowIndex row index for endTime of column which need to
be filled
* @return true if current cached TsBlock is enough to fill Column at
columnIndex, otherwise
* false.
*/
- private boolean isCachedTsBlockEnough(int columnIndex, long currentEndTime) {
+ private boolean isCachedTsBlockEnough(int columnIndex, long
currentEndRowIndex) {
// next TsBlock has already been in the cachedTsBlock
while (nextTsBlockIndex[columnIndex] < cachedTsBlock.size()) {
TsBlock nextTsBlock = cachedTsBlock.get(nextTsBlockIndex[columnIndex]);
+ long startRowIndex = cachedRowIndex.get(nextTsBlockIndex[columnIndex]);
nextTsBlockIndex[columnIndex]++;
if (fillArray[columnIndex].prepareForNext(
- currentEndTime, nextTsBlock.getTimeColumn(),
nextTsBlock.getColumn(columnIndex))) {
+ startRowIndex,
+ currentEndRowIndex,
+ nextTsBlock.getTimeColumn(),
+ nextTsBlock.getColumn(columnIndex))) {
return true;
}
}
@@ -184,6 +198,8 @@ public class LinearFillOperator implements ProcessOperator {
return false;
} else { // otherwise, we cache it
cachedTsBlock.add(nextTsBlock);
+ cachedRowIndex.add(currentRowIndex);
+ currentRowIndex += nextTsBlock.getPositionCount();
return true;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
index 7bdd899ad3..d0edfc5961 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java
@@ -29,26 +29,29 @@ public interface ILinearFill {
*
* @param timeColumn TimeColumn of valueColumn
* @param valueColumn valueColumn that need to be filled
+ * @param currentRowIndex current row index for start time in timeColumn
* @return Value Column that has been filled
*/
- Column fill(TimeColumn timeColumn, Column valueColumn);
+ Column fill(TimeColumn timeColumn, Column valueColumn, long currentRowIndex);
/**
- * @param time end time of current valueColumn that need to be filled
+ * @param rowIndex row index for end time of current valueColumn that need
to be filled
* @param valueColumn valueColumn that need to be filled
* @return true if valueColumn can't be filled using current information,
and we need to get next
* TsBlock and then call prepareForNext. false if valueColumn can be
filled using current
* information, and we can directly call fill() function
*/
- boolean needPrepareForNext(long time, Column valueColumn);
+ boolean needPrepareForNext(long rowIndex, Column valueColumn);
/**
- * @param time end time of current valueColumn that need to be filled
+ * @param startRowIndex row index for start time of nextValueColumn
+ * @param endRowIndex row index for end time of current valueColumn that
need to be filled
* @param nextTimeColumn TimeColumn of next TsBlock
* @param nextValueColumn Value Column of next TsBlock
* @return true if we get enough information to fill current column, and we
can stop getting next
* TsBlock and calling prepareForNext. false if we still don't get
enough information to fill
* current column, and still need to keep getting next TsBlock and then
call prepareForNext
*/
- boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column
nextValueColumn);
+ boolean prepareForNext(
+ long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column
nextValueColumn);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
index 9b1cb93a5a..34f329baf0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java
@@ -25,17 +25,18 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
public class IdentityLinearFill implements ILinearFill {
@Override
- public Column fill(TimeColumn timeColumn, Column valueColumn) {
+ public Column fill(TimeColumn timeColumn, Column valueColumn, long
currentRowIndex) {
return valueColumn;
}
@Override
- public boolean needPrepareForNext(long time, Column valueColumn) {
+ public boolean needPrepareForNext(long rowIndex, Column valueColumn) {
return false;
}
@Override
- public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column
nextValueColumn) {
+ public boolean prepareForNext(
+ long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column
nextValueColumn) {
throw new IllegalArgumentException(
"We won't call prepareForNext in IdentityLinearFill, because
needPrepareForNext() method will always return false.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
index ca8f558647..52a228b1e4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
@@ -34,10 +33,6 @@ public class DoubleLinearFill extends LinearFill {
private double nextValueInCurrentColumn;
- public DoubleLinearFill(boolean ascending, TimeComparator timeComparator) {
- super(ascending, timeComparator);
- }
-
@Override
void fillValue(Column column, int index, Object array) {
((double[]) array)[index] = column.getDouble(index);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
index 803c55ec8f..a32c60d61e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
@@ -34,10 +33,6 @@ public class FloatLinearFill extends LinearFill {
private float nextValueInCurrentColumn;
- public FloatLinearFill(boolean ascending, TimeComparator timeComparator) {
- super(ascending, timeComparator);
- }
-
@Override
void fillValue(Column column, int index, Object array) {
((float[]) array)[index] = column.getFloat(index);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
index 4876ed23b8..baf4806551 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
@@ -34,10 +33,6 @@ public class IntLinearFill extends LinearFill {
private int nextValueInCurrentColumn;
- public IntLinearFill(boolean ascending, TimeComparator timeComparator) {
- super(ascending, timeComparator);
- }
-
@Override
void fillValue(Column column, int index, Object array) {
((int[]) array)[index] = column.getInt(index);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
index b813661058..238fb91137 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -34,31 +33,16 @@ import static
com.google.common.base.Preconditions.checkArgument;
*/
public abstract class LinearFill implements ILinearFill {
- private final TimeComparator timeComparator;
-
// whether previous value is null
protected boolean previousIsNull = true;
- // time of next value
- protected long nextTime;
-
- protected long nextTimeInCurrentColumn;
- public LinearFill(boolean ascending, TimeComparator timeComparator) {
- this.nextTime = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
- this.nextTimeInCurrentColumn = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
- this.timeComparator = timeComparator;
- }
+ // next row index which is not null
+ private long nextRowIndex = -1;
+ // next row index in current column which is not null
+ private long nextRowIndexInCurrentColumn = -1;
- /**
- * Before we call this method, we need to make sure the nextValue has been
prepared or noMoreNext
- * has been set to true
- *
- * @param timeColumn TimeColumn of valueColumn
- * @param valueColumn valueColumn that need to be filled
- * @return Value Column that has been filled
- */
@Override
- public Column fill(TimeColumn timeColumn, Column valueColumn) {
+ public Column fill(TimeColumn timeColumn, Column valueColumn, long
startRowIndex) {
int size = valueColumn.getPositionCount();
// if this valueColumn is empty, just return itself;
if (size == 0) {
@@ -76,11 +60,13 @@ public abstract class LinearFill implements ILinearFill {
// if its values are all null
if (valueColumn instanceof RunLengthEncodedColumn) {
// previous value is null or next value is null, we just return
NULL_VALUE_BLOCK
- if (previousIsNull || timeComparator.inFillBound(nextTime,
timeColumn.getStartTime())) {
+ if (previousIsNull || nextRowIndex < startRowIndex) {
return new RunLengthEncodedColumn(createNullValueColumn(), size);
} else {
prepareForNextValueInCurrentColumn(
- timeColumn.getEndTime(), timeColumn.getPositionCount() - 1,
timeColumn, valueColumn);
+ startRowIndex + timeColumn.getPositionCount() - 1,
+ timeColumn.getPositionCount(),
+ valueColumn);
return new RunLengthEncodedColumn(createFilledValueColumn(), size);
}
} else {
@@ -92,10 +78,10 @@ public abstract class LinearFill implements ILinearFill {
for (int i = 0; i < size; i++) {
// current value is null, we need to fill it
if (valueColumn.isNull(i)) {
- long currentTime = timeColumn.getLong(i);
- prepareForNextValueInCurrentColumn(currentTime, i + 1, timeColumn,
valueColumn);
+ long currentRowIndex = startRowIndex + i;
+ prepareForNextValueInCurrentColumn(currentRowIndex, i + 1,
valueColumn);
// we don't fill it, if either previous value or next value is null
- if (previousIsNull || nextIsNull(currentTime)) {
+ if (previousIsNull || nextIsNull(currentRowIndex)) {
isNull[i] = true;
hasNullValue = true;
} else {
@@ -115,65 +101,56 @@ public abstract class LinearFill implements ILinearFill {
}
/**
- * @param time end time of current valueColumn that need to be filled
+ * @param rowIndex end time of current valueColumn that need to be filled
* @param valueColumn valueColumn that need to be filled
* @return true if valueColumn can't be filled using current information,
and we need to get next
* TsBlock and then call prepareForNext. false if valueColumn can be
filled using current
* information, and we can directly call fill() function
*/
@Override
- public boolean needPrepareForNext(long time, Column valueColumn) {
- return timeComparator.inFillBound(nextTime, time)
- && valueColumn.isNull(valueColumn.getPositionCount() - 1);
+ public boolean needPrepareForNext(long rowIndex, Column valueColumn) {
+ return nextRowIndex < rowIndex &&
valueColumn.isNull(valueColumn.getPositionCount() - 1);
}
- /**
- * @param time end time of current valueColumn that need to be filled
- * @param nextTimeColumn TimeColumn of next TsBlock
- * @param nextValueColumn Value Column of next TsBlock
- * @return true if we get enough information to fill current column, and we
can stop getting next
- * TsBlock and calling prepareForNext. false if we still don't get
enough information to fill
- * current column, and still need to keep getting next TsBlock and then
call prepareForNext
- */
@Override
- public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column
nextValueColumn) {
+ public boolean prepareForNext(
+ long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column
nextValueColumn) {
checkArgument(
- nextTimeColumn.getPositionCount() > 0
- && timeComparator.inFillBound(time, nextTimeColumn.getLong(0)),
+ nextTimeColumn.getPositionCount() > 0 && endRowIndex < startRowIndex,
"nextColumn's time should be greater than current time");
- if (timeComparator.satisfyCurEndTime(time, nextTime)) {
+ if (endRowIndex <= nextRowIndex) {
return true;
}
for (int i = 0; i < nextValueColumn.getPositionCount(); i++) {
if (!nextValueColumn.isNull(i)) {
updateNextValue(nextValueColumn, i);
- this.nextTime = nextTimeColumn.getLong(i);
+ this.nextRowIndex = startRowIndex + i;
return true;
}
}
return false;
}
- private boolean nextIsNull(long time) {
- return timeComparator.satisfyCurEndTime(nextTimeInCurrentColumn, time);
+ private boolean nextIsNull(long rowIndex) {
+ return nextRowIndexInCurrentColumn <= rowIndex;
}
private void prepareForNextValueInCurrentColumn(
- long time, int startIndex, TimeColumn timeColumn, Column valueColumn) {
- if (timeComparator.satisfyCurEndTime(time, nextTimeInCurrentColumn)) {
+ long currentRowIndex, int startIndex, Column valueColumn) {
+ if (currentRowIndex <= nextRowIndexInCurrentColumn) {
return;
}
for (int i = startIndex; i < valueColumn.getPositionCount(); i++) {
if (!valueColumn.isNull(i)) {
- this.nextTimeInCurrentColumn = timeColumn.getLong(i);
+ this.nextRowIndexInCurrentColumn = currentRowIndex + (i - startIndex +
1);
updateNextValueInCurrentColumn(valueColumn, i);
return;
}
}
// current column's value is not enough for filling, we should use value
of next Column
- this.nextTimeInCurrentColumn = this.nextTime;
+ this.nextRowIndexInCurrentColumn = this.nextRowIndex;
updateNextValueInCurrentColumn();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
index 5f6986ad2e..04dba1613a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
@@ -34,10 +33,6 @@ public class LongLinearFill extends LinearFill {
private long nextValueInCurrentColumn;
- public LongLinearFill(boolean ascending, TimeComparator timeComparator) {
- super(ascending, timeComparator);
- }
-
@Override
void fillValue(Column column, int index, Object array) {
((long[]) array)[index] = column.getLong(index);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
index 4aa723ad97..95b7316844 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
@@ -30,9 +30,4 @@ public class AscTimeComparator implements TimeComparator {
public long getCurrentEndTime(long time1, long time2) {
return Math.min(time1, time2);
}
-
- @Override
- public boolean inFillBound(long time, long timeBound) {
- return time < timeBound;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
index 495cd2852a..f53c97d8fe 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
@@ -30,9 +30,4 @@ public class DescTimeComparator implements TimeComparator {
public long getCurrentEndTime(long time1, long time2) {
return Math.max(time1, time2);
}
-
- @Override
- public boolean inFillBound(long time, long timeBound) {
- return time > timeBound;
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
index a2f2c076da..db017f6186 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
@@ -25,9 +25,4 @@ public interface TimeComparator {
/** @return min(time1, time2) if order by time asc, max(time1, time2) if
order by desc */
long getCurrentEndTime(long time1, long time2);
-
- /**
- * @return true if time < timeBound && order by time asc, time > timeBound
&& order by time desc
- */
- boolean inFillBound(long time, long timeBound);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 939f84544c..343c328eea 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -671,8 +671,7 @@ public class LocalExecutionPlanner {
context.getNextOperatorId(),
node.getPlanNodeId(),
LinearFillOperator.class.getSimpleName()),
- getLinearFill(
- inputColumns, inputDataTypes, node.getScanOrder() ==
OrderBy.TIMESTAMP_ASC),
+ getLinearFill(inputColumns, inputDataTypes),
child);
default:
throw new IllegalArgumentException("Unknown fill policy: " +
fillPolicy);
@@ -742,23 +741,21 @@ public class LocalExecutionPlanner {
return previousFill;
}
- private ILinearFill[] getLinearFill(
- int inputColumns, List<TSDataType> inputDataTypes, boolean ascending) {
+ private ILinearFill[] getLinearFill(int inputColumns, List<TSDataType>
inputDataTypes) {
ILinearFill[] linearFill = new ILinearFill[inputColumns];
- TimeComparator timeComparator = ascending ? ASC_TIME_COMPARATOR :
DESC_TIME_COMPARATOR;
for (int i = 0; i < inputColumns; i++) {
switch (inputDataTypes.get(i)) {
case INT32:
- linearFill[i] = new IntLinearFill(ascending, timeComparator);
+ linearFill[i] = new IntLinearFill();
break;
case INT64:
- linearFill[i] = new LongLinearFill(ascending, timeComparator);
+ linearFill[i] = new LongLinearFill();
break;
case FLOAT:
- linearFill[i] = new FloatLinearFill(ascending, timeComparator);
+ linearFill[i] = new FloatLinearFill();
break;
case DOUBLE:
- linearFill[i] = new DoubleLinearFill(ascending, timeComparator);
+ linearFill[i] = new DoubleLinearFill();
break;
case BOOLEAN:
case TEXT:
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 23ca161fa2..56c2fe6a1a 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -29,9 +29,6 @@ import
org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
-import
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
-import
org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
-import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -65,14 +62,12 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- boolean ascending = true;
- TimeComparator timeComparator = new AscTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator)
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -265,14 +260,12 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- boolean ascending = false;
- TimeComparator timeComparator = new DescTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator)
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -465,14 +458,12 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- boolean ascending = true;
- TimeComparator timeComparator = new AscTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator)
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -665,14 +656,12 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- boolean ascending = false;
- TimeComparator timeComparator = new DescTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator),
- new FloatLinearFill(ascending, timeComparator)
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill(),
+ new FloatLinearFill()
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -865,8 +854,7 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- LinearFill[] fillArray =
- new LinearFill[] {new FloatLinearFill(true, new
AscTimeComparator())};
+ LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()};
LinearFillOperator fillOperator =
new LinearFillOperator(
fragmentInstanceContext.getOperatorContexts().get(0),
@@ -971,8 +959,7 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- LinearFill[] fillArray =
- new LinearFill[] {new FloatLinearFill(false, new
DescTimeComparator())};
+ LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()};
LinearFillOperator fillOperator =
new LinearFillOperator(
fragmentInstanceContext.getOperatorContexts().get(0),