This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e4987bde94 [IOTDB-3724] Fix Incorrect result when querying with linear
fill & order by time desc (#6586)
e4987bde94 is described below
commit e4987bde9434c5bf3e1791c08922f26a8dd3d584
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jul 5 08:53:55 2022 +0800
[IOTDB-3724] Fix Incorrect result when querying with linear fill & order by
time desc (#6586)
---
.../iotdb/db/it/query/IoTDBNullValueFillIT.java | 2 -
.../process/fill/linear/DoubleLinearFill.java | 5 +
.../process/fill/linear/FloatLinearFill.java | 5 +
.../process/fill/linear/IntLinearFill.java | 5 +
.../operator/process/fill/linear/LinearFill.java | 27 +-
.../process/fill/linear/LongLinearFill.java | 5 +
.../operator/process/merge/AscTimeComparator.java | 5 +
.../operator/process/merge/DescTimeComparator.java | 5 +
.../operator/process/merge/TimeComparator.java | 5 +
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 15 +-
.../execution/operator/LinearFillOperatorTest.java | 532 ++++++++++++++++++++-
11 files changed, 586 insertions(+), 25 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
index 59264a60ad..46f4f9e5b7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -218,7 +217,6 @@ public class IoTDBNullValueFillIT {
}
@Test
- @Ignore // TODO fix IOTDB-3724
public void linearFillTest() {
String[] expectedHeader =
new String[] {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
index 52a228b1e4..ca8f558647 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
@@ -33,6 +34,10 @@ public class DoubleLinearFill extends LinearFill {
private double nextValueInCurrentColumn;
+ public DoubleLinearFill(boolean ascending, TimeComparator timeComparator) {
+ super(ascending, timeComparator);
+ }
+
@Override
void fillValue(Column column, int index, Object array) {
((double[]) array)[index] = column.getDouble(index);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
index a32c60d61e..803c55ec8f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
@@ -33,6 +34,10 @@ public class FloatLinearFill extends LinearFill {
private float nextValueInCurrentColumn;
+ public FloatLinearFill(boolean ascending, TimeComparator timeComparator) {
+ super(ascending, timeComparator);
+ }
+
@Override
void fillValue(Column column, int index, Object array) {
((float[]) array)[index] = column.getFloat(index);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
index baf4806551..4876ed23b8 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
@@ -33,6 +34,10 @@ public class IntLinearFill extends LinearFill {
private int nextValueInCurrentColumn;
+ public IntLinearFill(boolean ascending, TimeComparator timeComparator) {
+ super(ascending, timeComparator);
+ }
+
@Override
void fillValue(Column column, int index, Object array) {
((int[]) array)[index] = column.getInt(index);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
index 0e26d2baf5..92685b9c38 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -28,17 +29,25 @@ import static
com.google.common.base.Preconditions.checkArgument;
* The result of Linear Fill functions at timestamp "T" is calculated by
performing a linear fitting
* method on two time series values, one is at the closest timestamp before T,
and the other is at
* the closest timestamp after T. Linear Fill function calculation only
supports numeric types
- * including int, double and float.
+ * including long, int, double and float.
*/
public abstract class LinearFill {
+ private final TimeComparator timeComparator;
+
// whether previous value is null
protected boolean previousIsNull = true;
// time of next value
- protected long nextTime = Long.MIN_VALUE;
+ protected long nextTime;
protected long nextTimeInCurrentColumn;
+ public LinearFill(boolean ascending, TimeComparator timeComparator) {
+ this.nextTime = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
+ this.nextTimeInCurrentColumn = ascending ? Long.MIN_VALUE : Long.MAX_VALUE;
+ this.timeComparator = timeComparator;
+ }
+
/**
* Before we call this method, we need to make sure the nextValue has been
prepared or noMoreNext
* has been set to true
@@ -65,7 +74,7 @@ public abstract class LinearFill {
// if its values are all null
if (valueColumn instanceof RunLengthEncodedColumn) {
// previous value is null or next value is null, we just return
NULL_VALUE_BLOCK
- if (previousIsNull || nextTime < timeColumn.getStartTime()) {
+ if (previousIsNull || timeComparator.inFillBound(nextTime,
timeColumn.getStartTime())) {
return new RunLengthEncodedColumn(createNullValueColumn(), size);
} else {
prepareForNextValueInCurrentColumn(
@@ -111,7 +120,8 @@ public abstract class LinearFill {
* information, and we can directly call fill() function
*/
public boolean needPrepareForNext(long time, Column valueColumn) {
- return time > nextTime &&
valueColumn.isNull(valueColumn.getPositionCount() - 1);
+ return timeComparator.inFillBound(nextTime, time)
+ && valueColumn.isNull(valueColumn.getPositionCount() - 1);
}
/**
@@ -124,9 +134,10 @@ public abstract class LinearFill {
*/
public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column
nextValueColumn) {
checkArgument(
- nextTimeColumn.getPositionCount() > 0 && nextTimeColumn.getLong(0) >
time,
+ nextTimeColumn.getPositionCount() > 0
+ && timeComparator.inFillBound(time, nextTimeColumn.getLong(0)),
"nextColumn's time should be greater than current time");
- if (time <= nextTime) {
+ if (timeComparator.satisfyCurEndTime(time, nextTime)) {
return true;
}
@@ -141,12 +152,12 @@ public abstract class LinearFill {
}
private boolean nextIsNull(long time) {
- return nextTimeInCurrentColumn <= time;
+ return timeComparator.satisfyCurEndTime(nextTimeInCurrentColumn, time);
}
private void prepareForNextValueInCurrentColumn(
long time, int startIndex, TimeColumn timeColumn, Column valueColumn) {
- if (time <= nextTimeInCurrentColumn) {
+ if (timeComparator.satisfyCurEndTime(time, nextTimeInCurrentColumn)) {
return;
}
for (int i = startIndex; i < valueColumn.getPositionCount(); i++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
index 04dba1613a..5f6986ad2e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
@@ -33,6 +34,10 @@ public class LongLinearFill extends LinearFill {
private long nextValueInCurrentColumn;
+ public LongLinearFill(boolean ascending, TimeComparator timeComparator) {
+ super(ascending, timeComparator);
+ }
+
@Override
void fillValue(Column column, int index, Object array) {
((long[]) array)[index] = column.getLong(index);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
index 95b7316844..4aa723ad97 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java
@@ -30,4 +30,9 @@ public class AscTimeComparator implements TimeComparator {
public long getCurrentEndTime(long time1, long time2) {
return Math.min(time1, time2);
}
+
+ @Override
+ public boolean inFillBound(long time, long timeBound) {
+ return time < timeBound;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
index f53c97d8fe..495cd2852a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java
@@ -30,4 +30,9 @@ public class DescTimeComparator implements TimeComparator {
public long getCurrentEndTime(long time1, long time2) {
return Math.max(time1, time2);
}
+
+ @Override
+ public boolean inFillBound(long time, long timeBound) {
+ return time > timeBound;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
index db017f6186..a2f2c076da 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java
@@ -25,4 +25,9 @@ public interface TimeComparator {
/** @return min(time1, time2) if order by time asc, max(time1, time2) if
order by desc */
long getCurrentEndTime(long time1, long time2);
+
+ /**
+ * @return true if time < timeBound && order by time asc, time > timeBound
&& order by time desc
+ */
+ boolean inFillBound(long time, long timeBound);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 522b502d37..e4f34d448e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -665,7 +665,8 @@ public class LocalExecutionPlanner {
context.getNextOperatorId(),
node.getPlanNodeId(),
LinearFillOperator.class.getSimpleName()),
- getLinearFill(inputColumns, inputDataTypes),
+ getLinearFill(
+ inputColumns, inputDataTypes, node.getScanOrder() ==
OrderBy.TIMESTAMP_ASC),
child);
default:
throw new IllegalArgumentException("Unknown fill policy: " +
fillPolicy);
@@ -731,21 +732,23 @@ public class LocalExecutionPlanner {
return previousFill;
}
- private LinearFill[] getLinearFill(int inputColumns, List<TSDataType>
inputDataTypes) {
+ private LinearFill[] getLinearFill(
+ int inputColumns, List<TSDataType> inputDataTypes, boolean ascending) {
LinearFill[] linearFill = new LinearFill[inputColumns];
+ TimeComparator timeComparator = ascending ? ASC_TIME_COMPARATOR :
DESC_TIME_COMPARATOR;
for (int i = 0; i < inputColumns; i++) {
switch (inputDataTypes.get(i)) {
case INT32:
- linearFill[i] = new IntLinearFill();
+ linearFill[i] = new IntLinearFill(ascending, timeComparator);
break;
case INT64:
- linearFill[i] = new LongLinearFill();
+ linearFill[i] = new LongLinearFill(ascending, timeComparator);
break;
case FLOAT:
- linearFill[i] = new FloatLinearFill();
+ linearFill[i] = new FloatLinearFill(ascending, timeComparator);
break;
case DOUBLE:
- linearFill[i] = new DoubleLinearFill();
+ linearFill[i] = new DoubleLinearFill(ascending, timeComparator);
break;
case BOOLEAN:
case TEXT:
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
index 2fb9898418..9852707056 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java
@@ -27,6 +27,9 @@ import
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill;
import
org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill;
+import
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
+import
org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -59,12 +62,14 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
+ boolean ascending = true;
+ TimeComparator timeComparator = new AscTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(),
- new FloatLinearFill(),
- new FloatLinearFill(),
- new FloatLinearFill()
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator)
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -241,6 +246,206 @@ public class LinearFillOperatorTest {
}
}
+ @Test
+ public void batchLinearFillTest1OrderByDesc() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+ boolean ascending = false;
+ TimeComparator timeComparator = new DescTimeComparator();
+ LinearFill[] fillArray =
+ new LinearFill[] {
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator)
+ };
+ LinearFillOperator fillOperator =
+ new LinearFillOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ fillArray,
+ new Operator() {
+ private int index = 0;
+ private final float[][][] value =
+ new float[][][] {
+ {
+ {1.0f, 0.0f, 3.0f, 4.0f},
+ {11.0f, 12.0f, 13.0f, 0.0f},
+ {21.0f, 22.0f, 0.0f, 0.0f},
+ {0.0f, 32.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 43.0f, 0.0f}
+ },
+ {
+ {51.0f, 0.0f, 53.0f, 0.0f},
+ {61.0f, 62.0f, 63.0f, 0.0f},
+ {71.0f, 72.0f, 0.0f, 74.0f},
+ {0.0f, 82.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 93.0f, 0.0f}
+ },
+ {
+ {101.0f, 0.0f, 103.0f, 0.0f},
+ {111.0f, 112.0f, 113.0f, 114.0f},
+ {121.0f, 122.0f, 0.0f, 124.0f},
+ {0.0f, 132.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 143.0f, 0.0f}
+ }
+ };
+ final boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false, false},
+ {false, false, false, true},
+ {false, false, true, true},
+ {true, false, true, true},
+ {true, true, false, true}
+ },
+ {
+ {false, true, false, true},
+ {false, false, false, true},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, false, true}
+ },
+ {
+ {false, true, false, true},
+ {false, false, false, false},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, false, true}
+ }
+ };
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ ImmutableList.of(
+ TSDataType.FLOAT,
+ TSDataType.FLOAT,
+ TSDataType.FLOAT,
+ TSDataType.FLOAT));
+ for (int i = 0; i < 5; i++) {
+ builder.getTimeColumnBuilder().writeLong((4 - i) + (2 -
index) * 5L);
+ for (int j = 0; j < 4; j++) {
+ if (isNull[index][i][j]) {
+ builder.getColumnBuilder(j).appendNull();
+ } else {
+
builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+ }
+ }
+ builder.declarePosition();
+ }
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+ });
+
+ float[][][] res =
+ new float[][][] {
+ {
+ {1.0f, 0.0f, 3.0f, 4.0f},
+ {11.0f, 12.0f, 13.0f, 39.0f},
+ {21.0f, 22.0f, 28.0f, 39.0f},
+ {36.0f, 32.0f, 28.0f, 39.0f},
+ {36.0f, 47.0f, 43.0f, 39.0f}
+ },
+ {
+ {51.0f, 47.0f, 53.0f, 39.0f},
+ {61.0f, 62.0f, 63.0f, 39.0f},
+ {71.0f, 72.0f, 78.0f, 74.0f},
+ {86.0f, 82.0f, 78.0f, 94.0f},
+ {86.0f, 97.0f, 93.0f, 94.0f}
+ },
+ {
+ {101.0f, 97.0f, 103.0f, 94.0f},
+ {111.0f, 112.0f, 113.0f, 114.0f},
+ {121.0f, 122.0f, 128.0f, 124.0f},
+ {0.0f, 132.0f, 128.0f, 0.0f},
+ {0.0f, 0.0f, 143.0f, 0.0f}
+ }
+ };
+ boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ },
+ {
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ },
+ {
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {true, false, false, true},
+ {true, true, false, true}
+ }
+ };
+
+ boolean[] nullBlock = new boolean[] {true, false, false, false};
+ int nullBlockIndex = 0;
+ int count = 0;
+ while (fillOperator.hasNext()) {
+ TsBlock block = fillOperator.next();
+ assertEquals(nullBlock[nullBlockIndex++], block == null);
+ if (block == null) {
+ continue;
+ }
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ long expectedTime = (block.getPositionCount() - i - 1) + (res.length
- count - 1) * 5L;
+ assertEquals(expectedTime, block.getTimeByIndex(i));
+ for (int j = 0; j < 4; j++) {
+ assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+ if (!isNull[count][i][j]) {
+ assertEquals(res[count][i][j], block.getColumn(j).getFloat(i),
0.00001f);
+ }
+ }
+ }
+ count++;
+ }
+
+ assertTrue(fillOperator.isFinished());
+ assertEquals(res.length, count);
+ assertEquals(nullBlock.length, nullBlockIndex);
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
@Test
public void batchLinearFillTest2() {
ExecutorService instanceNotificationExecutor =
@@ -257,12 +462,14 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
+ boolean ascending = true;
+ TimeComparator timeComparator = new AscTimeComparator();
LinearFill[] fillArray =
new LinearFill[] {
- new FloatLinearFill(),
- new FloatLinearFill(),
- new FloatLinearFill(),
- new FloatLinearFill()
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator)
};
LinearFillOperator fillOperator =
new LinearFillOperator(
@@ -439,6 +646,206 @@ public class LinearFillOperatorTest {
}
}
+ @Test
+ public void batchLinearFillTest2OrderByDesc() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+ boolean ascending = false;
+ TimeComparator timeComparator = new DescTimeComparator();
+ LinearFill[] fillArray =
+ new LinearFill[] {
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator),
+ new FloatLinearFill(ascending, timeComparator)
+ };
+ LinearFillOperator fillOperator =
+ new LinearFillOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ fillArray,
+ new Operator() {
+ private int index = 0;
+ private final float[][][] value =
+ new float[][][] {
+ {
+ {1.0f, 0.0f, 3.0f, 4.0f},
+ {11.0f, 12.0f, 13.0f, 0.0f},
+ {21.0f, 22.0f, 0.0f, 0.0f},
+ {0.0f, 32.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 0.0f, 0.0f}
+ },
+ {
+ {51.0f, 0.0f, 0.0f, 0.0f},
+ {61.0f, 62.0f, 0.0f, 0.0f},
+ {71.0f, 72.0f, 0.0f, 74.0f},
+ {0.0f, 82.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 0.0f, 0.0f}
+ },
+ {
+ {101.0f, 0.0f, 103.0f, 0.0f},
+ {111.0f, 112.0f, 0.0f, 114.0f},
+ {121.0f, 122.0f, 0.0f, 124.0f},
+ {0.0f, 132.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 0.0f, 0.0f}
+ }
+ };
+ final boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false, false},
+ {false, false, false, true},
+ {false, false, true, true},
+ {true, false, true, true},
+ {true, true, true, true}
+ },
+ {
+ {false, true, true, true},
+ {false, false, true, true},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, true, true}
+ },
+ {
+ {false, true, false, true},
+ {false, false, true, false},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, true, true}
+ }
+ };
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder =
+ new TsBlockBuilder(
+ ImmutableList.of(
+ TSDataType.FLOAT,
+ TSDataType.FLOAT,
+ TSDataType.FLOAT,
+ TSDataType.FLOAT));
+ for (int i = 0; i < 5; i++) {
+ builder.getTimeColumnBuilder().writeLong((4 - i) + (2 -
index) * 5L);
+ for (int j = 0; j < 4; j++) {
+ if (isNull[index][i][j]) {
+ builder.getColumnBuilder(j).appendNull();
+ } else {
+
builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+ }
+ }
+ builder.declarePosition();
+ }
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 3;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= 3;
+ }
+ });
+
+ int count = 0;
+ float[][][] res =
+ new float[][][] {
+ {
+ {1.0f, 0.0f, 3.0f, 4.0f},
+ {11.0f, 12.0f, 13.0f, 39.0f},
+ {21.0f, 22.0f, 58.0f, 39.0f},
+ {36.0f, 32.0f, 58.0f, 39.0f},
+ {36.0f, 47.0f, 58.0f, 39.0f}
+ },
+ {
+ {51.0f, 47.0f, 58.0f, 39.0f},
+ {61.0f, 62.0f, 58.0f, 39.0f},
+ {71.0f, 72.0f, 58.0f, 74.0f},
+ {86.0f, 82.0f, 58.0f, 94.0f},
+ {86.0f, 97.0f, 58.0f, 94.0f}
+ },
+ {
+ {101.0f, 97.0f, 103.0f, 94.0f},
+ {111.0f, 112.0f, 0.0f, 114.0f},
+ {121.0f, 122.0f, 0.0f, 124.0f},
+ {0.0f, 132.0f, 0.0f, 0.0f},
+ {0.0f, 0.0f, 0.0f, 0.0f}
+ }
+ };
+ boolean[][][] isNull =
+ new boolean[][][] {
+ {
+ {false, true, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ },
+ {
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false},
+ {false, false, false, false}
+ },
+ {
+ {false, false, false, false},
+ {false, false, true, false},
+ {false, false, true, false},
+ {true, false, true, true},
+ {true, true, true, true}
+ }
+ };
+
+ boolean[] nullBlock = new boolean[] {true, true, false, false, false};
+ int nullBlockIndex = 0;
+ while (fillOperator.hasNext()) {
+ TsBlock block = fillOperator.next();
+ assertEquals(nullBlock[nullBlockIndex++], block == null);
+ if (block == null) {
+ continue;
+ }
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ long expectedTime = (block.getPositionCount() - i - 1) + (res.length
- count - 1) * 5L;
+ assertEquals(expectedTime, block.getTimeByIndex(i));
+ for (int j = 0; j < 4; j++) {
+ assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+ if (!isNull[count][i][j]) {
+ assertEquals(res[count][i][j], block.getColumn(j).getFloat(i),
0.00001f);
+ }
+ }
+ }
+ count++;
+ }
+
+ assertTrue(fillOperator.isFinished());
+ assertEquals(res.length, count);
+ assertEquals(nullBlock.length, nullBlockIndex);
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
@Test
public void batchLinearFillTest3() {
ExecutorService instanceNotificationExecutor =
@@ -455,7 +862,8 @@ public class LinearFillOperatorTest {
fragmentInstanceContext.addOperatorContext(
1, planNodeId1, LinearFillOperator.class.getSimpleName());
- LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()};
+ LinearFill[] fillArray =
+ new LinearFill[] {new FloatLinearFill(true, new
AscTimeComparator())};
LinearFillOperator fillOperator =
new LinearFillOperator(
fragmentInstanceContext.getOperatorContexts().get(0),
@@ -543,4 +951,110 @@ public class LinearFillOperatorTest {
instanceNotificationExecutor.shutdown();
}
}
+
+ @Test
+ public void batchLinearFillTest3OrderByDesc() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, LinearFillOperator.class.getSimpleName());
+
+ LinearFill[] fillArray =
+ new LinearFill[] {new FloatLinearFill(false, new
DescTimeComparator())};
+ LinearFillOperator fillOperator =
+ new LinearFillOperator(
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ fillArray,
+ new Operator() {
+ private int index = 0;
+ private final float[][][] value =
+ new float[][][] {
+ {{0.0f}}, {{2.0f}}, {{3.0f}}, {{4.0f}}, {{0.0f}},
{{0.0f}}, {{0.0f}}
+ };
+ final boolean[][][] isNull =
+ new boolean[][][] {
+ {{true}}, {{false}}, {{false}}, {{false}}, {{true}},
{{true}}, {{true}}
+ };
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return null;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder = new
TsBlockBuilder(ImmutableList.of(TSDataType.FLOAT));
+ for (int i = 0; i < 1; i++) {
+ builder.getTimeColumnBuilder().writeLong(i + (6 - index));
+ for (int j = 0; j < 1; j++) {
+ if (isNull[index][i][j]) {
+ builder.getColumnBuilder(j).appendNull();
+ } else {
+
builder.getColumnBuilder(j).writeFloat(value[index][i][j]);
+ }
+ }
+ builder.declarePosition();
+ }
+ index++;
+ return builder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return index < 7;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return index >= 7;
+ }
+ });
+
+ int count = 0;
+ float[][][] res =
+ new float[][][] {{{0.0f}}, {{2.0f}}, {{3.0f}}, {{4.0f}}, {{0.0f}},
{{0.0f}}, {{0.0f}}};
+ boolean[][][] isNull =
+ new boolean[][][] {
+ {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}},
{{true}}
+ };
+
+ boolean[] nullBlock =
+ new boolean[] {true, false, false, false, false, true, true, true,
false, false, false};
+ int nullBlockIndex = 0;
+ while (fillOperator.hasNext()) {
+ TsBlock block = fillOperator.next();
+ assertEquals(nullBlock[nullBlockIndex++], block == null);
+ if (block == null) {
+ continue;
+ }
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ long expectedTime = (block.getPositionCount() - i - 1) + (res.length
- count - 1);
+ assertEquals(expectedTime, block.getTimeByIndex(i));
+ for (int j = 0; j < 1; j++) {
+ assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i));
+ if (!isNull[count][i][j]) {
+ assertEquals(res[count][i][j], block.getColumn(j).getFloat(i),
0.00001f);
+ }
+ }
+ }
+ count++;
+ }
+
+ assertTrue(fillOperator.isFinished());
+ assertEquals(res.length, count);
+ assertEquals(nullBlock.length, nullBlockIndex);
+
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}