This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-3724 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 51e7250a09a2f632f122528fbfd0562a6e4393fb Author: JackieTien97 <[email protected]> AuthorDate: Mon Jul 4 18:55:31 2022 +0800 [IOTDB-3724] Fix Incorrect result when querying with linear fill & order by time desc --- .../iotdb/db/it/query/IoTDBNullValueFillIT.java | 2 - .../process/fill/linear/DoubleLinearFill.java | 5 + .../process/fill/linear/FloatLinearFill.java | 5 + .../process/fill/linear/IntLinearFill.java | 5 + .../operator/process/fill/linear/LinearFill.java | 27 +- .../process/fill/linear/LongLinearFill.java | 5 + .../operator/process/merge/AscTimeComparator.java | 5 + .../operator/process/merge/DescTimeComparator.java | 5 + .../operator/process/merge/TimeComparator.java | 5 + .../db/mpp/plan/planner/LocalExecutionPlanner.java | 14 +- .../execution/operator/LinearFillOperatorTest.java | 532 ++++++++++++++++++++- 11 files changed, 585 insertions(+), 25 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java index 59264a60ad..46f4f9e5b7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java @@ -25,7 +25,6 @@ import org.apache.iotdb.itbase.category.ClusterIT; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -218,7 +217,6 @@ public class IoTDBNullValueFillIT { } @Test - @Ignore // TODO fix IOTDB-3724 public void linearFillTest() { String[] expectedHeader = new String[] { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java index 52a228b1e4..ca8f558647 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/DoubleLinearFill.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn; import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder; @@ -33,6 +34,10 @@ public class DoubleLinearFill extends LinearFill { private double nextValueInCurrentColumn; + public DoubleLinearFill(boolean ascending, TimeComparator timeComparator) { + super(ascending, timeComparator); + } + @Override void fillValue(Column column, int index, Object array) { ((double[]) array)[index] = column.getDouble(index); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java index a32c60d61e..803c55ec8f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/FloatLinearFill.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn; import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder; @@ -33,6 +34,10 @@ public class FloatLinearFill extends LinearFill { private float nextValueInCurrentColumn; + public FloatLinearFill(boolean ascending, TimeComparator timeComparator) { + super(ascending, timeComparator); + } + @Override void fillValue(Column column, int index, Object array) { ((float[]) array)[index] = column.getFloat(index); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java index baf4806551..4876ed23b8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/IntLinearFill.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder; @@ -33,6 +34,10 @@ public class IntLinearFill extends LinearFill { private int nextValueInCurrentColumn; + public IntLinearFill(boolean ascending, TimeComparator timeComparator) { + super(ascending, timeComparator); + } + @Override void fillValue(Column column, int index, Object array) { ((int[]) array)[index] = column.getInt(index); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java index 0e26d2baf5..92685b9c38 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; @@ -28,17 +29,25 @@ import static com.google.common.base.Preconditions.checkArgument; * The result of Linear Fill functions at timestamp "T" is calculated by performing a linear fitting * method on two time series values, one is at the closest timestamp before T, and the other is at * the closest timestamp after T. Linear Fill function calculation only supports numeric types - * including int, double and float. + * including long, int, double and float. */ public abstract class LinearFill { + private final TimeComparator timeComparator; + // whether previous value is null protected boolean previousIsNull = true; // time of next value - protected long nextTime = Long.MIN_VALUE; + protected long nextTime; protected long nextTimeInCurrentColumn; + public LinearFill(boolean ascending, TimeComparator timeComparator) { + this.nextTime = ascending ? Long.MIN_VALUE : Long.MAX_VALUE; + this.nextTimeInCurrentColumn = ascending ? Long.MIN_VALUE : Long.MAX_VALUE; + this.timeComparator = timeComparator; + } + /** * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext * has been set to true @@ -65,7 +74,7 @@ public abstract class LinearFill { // if its values are all null if (valueColumn instanceof RunLengthEncodedColumn) { // previous value is null or next value is null, we just return NULL_VALUE_BLOCK - if (previousIsNull || nextTime < timeColumn.getStartTime()) { + if (previousIsNull || timeComparator.inFillBound(nextTime, timeColumn.getStartTime())) { return new RunLengthEncodedColumn(createNullValueColumn(), size); } else { prepareForNextValueInCurrentColumn( @@ -111,7 +120,8 @@ public abstract class LinearFill { * information, and we can directly call fill() function */ public boolean needPrepareForNext(long time, Column valueColumn) { - return time > nextTime && valueColumn.isNull(valueColumn.getPositionCount() - 1); + return timeComparator.inFillBound(nextTime, time) + && valueColumn.isNull(valueColumn.getPositionCount() - 1); } /** @@ -124,9 +134,10 @@ public abstract class LinearFill { */ public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) { checkArgument( - nextTimeColumn.getPositionCount() > 0 && nextTimeColumn.getLong(0) > time, + nextTimeColumn.getPositionCount() > 0 + && timeComparator.inFillBound(time, nextTimeColumn.getLong(0)), "nextColumn's time should be greater than current time"); - if (time <= nextTime) { + if (timeComparator.satisfyCurEndTime(time, nextTime)) { return true; } @@ -141,12 +152,12 @@ public abstract class LinearFill { } private boolean nextIsNull(long time) { - return nextTimeInCurrentColumn <= time; + return timeComparator.satisfyCurEndTime(nextTimeInCurrentColumn, time); } private void prepareForNextValueInCurrentColumn( long time, int startIndex, TimeColumn timeColumn, Column valueColumn) { - if (time <= nextTimeInCurrentColumn) { + if (timeComparator.satisfyCurEndTime(time, nextTimeInCurrentColumn)) { return; } for (int i = startIndex; i < valueColumn.getPositionCount(); i++) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java index 04dba1613a..5f6986ad2e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LongLinearFill.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.LongColumn; import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder; @@ -33,6 +34,10 @@ public class LongLinearFill extends LinearFill { private long nextValueInCurrentColumn; + public LongLinearFill(boolean ascending, TimeComparator timeComparator) { + super(ascending, timeComparator); + } + @Override void fillValue(Column column, int index, Object array) { ((long[]) array)[index] = column.getLong(index); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java index 95b7316844..4aa723ad97 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/AscTimeComparator.java @@ -30,4 +30,9 @@ public class AscTimeComparator implements TimeComparator { public long getCurrentEndTime(long time1, long time2) { return Math.min(time1, time2); } + + @Override + public boolean inFillBound(long time, long timeBound) { + return time < timeBound; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java index f53c97d8fe..495cd2852a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/DescTimeComparator.java @@ -30,4 +30,9 @@ public class DescTimeComparator implements TimeComparator { public long getCurrentEndTime(long time1, long time2) { return Math.max(time1, time2); } + + @Override + public boolean inFillBound(long time, long timeBound) { + return time > timeBound; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java index db017f6186..a2f2c076da 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/merge/TimeComparator.java @@ -25,4 +25,9 @@ public interface TimeComparator { /** @return min(time1, time2) if order by time asc, max(time1, time2) if order by desc */ long getCurrentEndTime(long time1, long time2); + + /** + * @return true if time < timeBound && order by time asc, time > timeBound && order by time desc + */ + boolean inFillBound(long time, long timeBound); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 522b502d37..2659a6fb86 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -665,7 +665,7 @@ public class LocalExecutionPlanner { context.getNextOperatorId(), node.getPlanNodeId(), LinearFillOperator.class.getSimpleName()), - getLinearFill(inputColumns, inputDataTypes), + getLinearFill(inputColumns, inputDataTypes, false), child); default: throw new IllegalArgumentException("Unknown fill policy: " + fillPolicy); @@ -731,21 +731,23 @@ public class LocalExecutionPlanner { return previousFill; } - private LinearFill[] getLinearFill(int inputColumns, List<TSDataType> inputDataTypes) { + private LinearFill[] getLinearFill( + int inputColumns, List<TSDataType> inputDataTypes, boolean ascending) { LinearFill[] linearFill = new LinearFill[inputColumns]; + TimeComparator timeComparator = ascending ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR; for (int i = 0; i < inputColumns; i++) { switch (inputDataTypes.get(i)) { case INT32: - linearFill[i] = new IntLinearFill(); + linearFill[i] = new IntLinearFill(ascending, timeComparator); break; case INT64: - linearFill[i] = new LongLinearFill(); + linearFill[i] = new LongLinearFill(ascending, timeComparator); break; case FLOAT: - linearFill[i] = new FloatLinearFill(); + linearFill[i] = new FloatLinearFill(ascending, timeComparator); break; case DOUBLE: - linearFill[i] = new DoubleLinearFill(); + linearFill[i] = new DoubleLinearFill(ascending, timeComparator); break; case BOOLEAN: case TEXT: diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java index 2fb9898418..d44d72ec30 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java @@ -27,6 +27,9 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator; import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator; +import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -59,12 +62,14 @@ public class LinearFillOperatorTest { fragmentInstanceContext.addOperatorContext( 1, planNodeId1, LinearFillOperator.class.getSimpleName()); + boolean ascending = true; + TimeComparator timeComparator = new AscTimeComparator(); LinearFill[] fillArray = new LinearFill[] { - new FloatLinearFill(), - new FloatLinearFill(), - new FloatLinearFill(), - new FloatLinearFill() + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator) }; LinearFillOperator fillOperator = new LinearFillOperator( @@ -241,6 +246,206 @@ public class LinearFillOperatorTest { } } + @Test + public void batchLinearFillTest1OrderByDesc() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId1, LinearFillOperator.class.getSimpleName()); + + boolean ascending = false; + TimeComparator timeComparator = new DescTimeComparator(); + LinearFill[] fillArray = + new LinearFill[] { + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator) + }; + LinearFillOperator fillOperator = + new LinearFillOperator( + fragmentInstanceContext.getOperatorContexts().get(0), + fillArray, + new Operator() { + private int index = 0; + private final float[][][] value = + new float[][][] { + { + {1.0f, 0.0f, 3.0f, 4.0f}, + {11.0f, 12.0f, 13.0f, 0.0f}, + {21.0f, 22.0f, 0.0f, 0.0f}, + {0.0f, 32.0f, 0.0f, 0.0f}, + {0.0f, 0.0f, 43.0f, 0.0f} + }, + { + {51.0f, 0.0f, 53.0f, 0.0f}, + {61.0f, 62.0f, 63.0f, 0.0f}, + {71.0f, 72.0f, 0.0f, 74.0f}, + {0.0f, 82.0f, 0.0f, 0.0f}, + {0.0f, 0.0f, 93.0f, 0.0f} + }, + { + {101.0f, 0.0f, 103.0f, 0.0f}, + {111.0f, 112.0f, 113.0f, 114.0f}, + {121.0f, 122.0f, 0.0f, 124.0f}, + {0.0f, 132.0f, 0.0f, 0.0f}, + {0.0f, 0.0f, 143.0f, 0.0f} + } + }; + final boolean[][][] isNull = + new boolean[][][] { + { + {false, true, false, false}, + {false, false, false, true}, + {false, false, true, true}, + {true, false, true, true}, + {true, true, false, true} + }, + { + {false, true, false, true}, + {false, false, false, true}, + {false, false, true, false}, + {true, false, true, true}, + {true, true, false, true} + }, + { + {false, true, false, true}, + {false, false, false, false}, + {false, false, true, false}, + {true, false, true, true}, + {true, true, false, true} + } + }; + + @Override + public OperatorContext getOperatorContext() { + return null; + } + + @Override + public TsBlock next() { + TsBlockBuilder builder = + new TsBlockBuilder( + ImmutableList.of( + TSDataType.FLOAT, + TSDataType.FLOAT, + TSDataType.FLOAT, + TSDataType.FLOAT)); + for (int i = 0; i < 5; i++) { + builder.getTimeColumnBuilder().writeLong((4 - i) + (2 - index) * 5L); + for (int j = 0; j < 4; j++) { + if (isNull[index][i][j]) { + builder.getColumnBuilder(j).appendNull(); + } else { + builder.getColumnBuilder(j).writeFloat(value[index][i][j]); + } + } + builder.declarePosition(); + } + index++; + return builder.build(); + } + + @Override + public boolean hasNext() { + return index < 3; + } + + @Override + public boolean isFinished() { + return index >= 3; + } + }); + + float[][][] res = + new float[][][] { + { + {1.0f, 0.0f, 3.0f, 4.0f}, + {11.0f, 12.0f, 13.0f, 39.0f}, + {21.0f, 22.0f, 28.0f, 39.0f}, + {36.0f, 32.0f, 28.0f, 39.0f}, + {36.0f, 47.0f, 43.0f, 39.0f} + }, + { + {51.0f, 47.0f, 53.0f, 39.0f}, + {61.0f, 62.0f, 63.0f, 39.0f}, + {71.0f, 72.0f, 78.0f, 74.0f}, + {86.0f, 82.0f, 78.0f, 94.0f}, + {86.0f, 97.0f, 93.0f, 94.0f} + }, + { + {101.0f, 97.0f, 103.0f, 94.0f}, + {111.0f, 112.0f, 113.0f, 114.0f}, + {121.0f, 122.0f, 128.0f, 124.0f}, + {0.0f, 132.0f, 128.0f, 0.0f}, + {0.0f, 0.0f, 143.0f, 0.0f} + } + }; + boolean[][][] isNull = + new boolean[][][] { + { + {false, true, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false} + }, + { + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false} + }, + { + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {true, false, false, true}, + {true, true, false, true} + } + }; + + boolean[] nullBlock = new boolean[] {true, false, false, false}; + int nullBlockIndex = 0; + int count = 0; + while (fillOperator.hasNext()) { + TsBlock block = fillOperator.next(); + assertEquals(nullBlock[nullBlockIndex++], block == null); + if (block == null) { + continue; + } + for (int i = 0; i < block.getPositionCount(); i++) { + long expectedTime = (block.getPositionCount() - i - 1) + (res.length - count - 1) * 5L; + assertEquals(expectedTime, block.getTimeByIndex(i)); + for (int j = 0; j < 4; j++) { + assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i)); + if (!isNull[count][i][j]) { + assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 0.00001f); + } + } + } + count++; + } + + assertTrue(fillOperator.isFinished()); + assertEquals(res.length, count); + assertEquals(nullBlock.length, nullBlockIndex); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + @Test public void batchLinearFillTest2() { ExecutorService instanceNotificationExecutor = @@ -257,12 +462,14 @@ public class LinearFillOperatorTest { fragmentInstanceContext.addOperatorContext( 1, planNodeId1, LinearFillOperator.class.getSimpleName()); + boolean ascending = true; + TimeComparator timeComparator = new AscTimeComparator(); LinearFill[] fillArray = new LinearFill[] { - new FloatLinearFill(), - new FloatLinearFill(), - new FloatLinearFill(), - new FloatLinearFill() + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator) }; LinearFillOperator fillOperator = new LinearFillOperator( @@ -439,6 +646,206 @@ public class LinearFillOperatorTest { } } + @Test + public void batchLinearFillTest2OrderByDesc() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId1, LinearFillOperator.class.getSimpleName()); + + boolean ascending = false; + TimeComparator timeComparator = new DescTimeComparator(); + LinearFill[] fillArray = + new LinearFill[] { + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator), + new FloatLinearFill(ascending, timeComparator) + }; + LinearFillOperator fillOperator = + new LinearFillOperator( + fragmentInstanceContext.getOperatorContexts().get(0), + fillArray, + new Operator() { + private int index = 0; + private final float[][][] value = + new float[][][] { + { + {1.0f, 0.0f, 3.0f, 4.0f}, + {11.0f, 12.0f, 13.0f, 0.0f}, + {21.0f, 22.0f, 0.0f, 0.0f}, + {0.0f, 32.0f, 0.0f, 0.0f}, + {0.0f, 0.0f, 0.0f, 0.0f} + }, + { + {51.0f, 0.0f, 0.0f, 0.0f}, + {61.0f, 62.0f, 0.0f, 0.0f}, + {71.0f, 72.0f, 0.0f, 74.0f}, + {0.0f, 82.0f, 0.0f, 0.0f}, + {0.0f, 0.0f, 0.0f, 0.0f} + }, + { + {101.0f, 0.0f, 103.0f, 0.0f}, + {111.0f, 112.0f, 0.0f, 114.0f}, + {121.0f, 122.0f, 0.0f, 124.0f}, + {0.0f, 132.0f, 0.0f, 0.0f}, + {0.0f, 0.0f, 0.0f, 0.0f} + } + }; + final boolean[][][] isNull = + new boolean[][][] { + { + {false, true, false, false}, + {false, false, false, true}, + {false, false, true, true}, + {true, false, true, true}, + {true, true, true, true} + }, + { + {false, true, true, true}, + {false, false, true, true}, + {false, false, true, false}, + {true, false, true, true}, + {true, true, true, true} + }, + { + {false, true, false, true}, + {false, false, true, false}, + {false, false, true, false}, + {true, false, true, true}, + {true, true, true, true} + } + }; + + @Override + public OperatorContext getOperatorContext() { + return null; + } + + @Override + public TsBlock next() { + TsBlockBuilder builder = + new TsBlockBuilder( + ImmutableList.of( + TSDataType.FLOAT, + TSDataType.FLOAT, + TSDataType.FLOAT, + TSDataType.FLOAT)); + for (int i = 0; i < 5; i++) { + builder.getTimeColumnBuilder().writeLong((4 - i) + (2 - index) * 5L); + for (int j = 0; j < 4; j++) { + if (isNull[index][i][j]) { + builder.getColumnBuilder(j).appendNull(); + } else { + builder.getColumnBuilder(j).writeFloat(value[index][i][j]); + } + } + builder.declarePosition(); + } + index++; + return builder.build(); + } + + @Override + public boolean hasNext() { + return index < 3; + } + + @Override + public boolean isFinished() { + return index >= 3; + } + }); + + int count = 0; + float[][][] res = + new float[][][] { + { + {1.0f, 0.0f, 3.0f, 4.0f}, + {11.0f, 12.0f, 13.0f, 39.0f}, + {21.0f, 22.0f, 58.0f, 39.0f}, + {36.0f, 32.0f, 58.0f, 39.0f}, + {36.0f, 47.0f, 58.0f, 39.0f} + }, + { + {51.0f, 47.0f, 58.0f, 39.0f}, + {61.0f, 62.0f, 58.0f, 39.0f}, + {71.0f, 72.0f, 58.0f, 74.0f}, + {86.0f, 82.0f, 58.0f, 94.0f}, + {86.0f, 97.0f, 58.0f, 94.0f} + }, + { + {101.0f, 97.0f, 103.0f, 94.0f}, + {111.0f, 112.0f, 0.0f, 114.0f}, + {121.0f, 122.0f, 0.0f, 124.0f}, + {0.0f, 132.0f, 0.0f, 0.0f}, + {0.0f, 0.0f, 0.0f, 0.0f} + } + }; + boolean[][][] isNull = + new boolean[][][] { + { + {false, true, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false} + }, + { + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false}, + {false, false, false, false} + }, + { + {false, false, false, false}, + {false, false, true, false}, + {false, false, true, false}, + {true, false, true, true}, + {true, true, true, true} + } + }; + + boolean[] nullBlock = new boolean[] {true, true, false, false, false}; + int nullBlockIndex = 0; + while (fillOperator.hasNext()) { + TsBlock block = fillOperator.next(); + assertEquals(nullBlock[nullBlockIndex++], block == null); + if (block == null) { + continue; + } + for (int i = 0; i < block.getPositionCount(); i++) { + long expectedTime = (block.getPositionCount() - i - 1) + (res.length - count - 1) * 5L; + assertEquals(expectedTime, block.getTimeByIndex(i)); + for (int j = 0; j < 4; j++) { + assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i)); + if (!isNull[count][i][j]) { + assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 0.00001f); + } + } + } + count++; + } + + assertTrue(fillOperator.isFinished()); + assertEquals(res.length, count); + assertEquals(nullBlock.length, nullBlockIndex); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + @Test public void batchLinearFillTest3() { ExecutorService instanceNotificationExecutor = @@ -455,7 +862,8 @@ public class LinearFillOperatorTest { fragmentInstanceContext.addOperatorContext( 1, planNodeId1, LinearFillOperator.class.getSimpleName()); - LinearFill[] fillArray = new LinearFill[] {new FloatLinearFill()}; + LinearFill[] fillArray = + new LinearFill[] {new FloatLinearFill(true, new AscTimeComparator())}; LinearFillOperator fillOperator = new LinearFillOperator( fragmentInstanceContext.getOperatorContexts().get(0), @@ -543,4 +951,110 @@ public class LinearFillOperatorTest { instanceNotificationExecutor.shutdown(); } } + + @Test + public void batchLinearFillTest3OrderByDesc() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId1, LinearFillOperator.class.getSimpleName()); + + LinearFill[] fillArray = + new LinearFill[] {new FloatLinearFill(false, new DescTimeComparator())}; + LinearFillOperator fillOperator = + new LinearFillOperator( + fragmentInstanceContext.getOperatorContexts().get(0), + fillArray, + new Operator() { + private int index = 0; + private final float[][][] value = + new float[][][] { + {{0.0f}}, {{2.0f}}, {{3.0f}}, {{4.0f}}, {{0.0f}}, {{0.0f}}, {{0.0f}} + }; + final boolean[][][] isNull = + new boolean[][][] { + {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, {{true}} + }; + + @Override + public OperatorContext getOperatorContext() { + return null; + } + + @Override + public TsBlock next() { + TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.FLOAT)); + for (int i = 0; i < 1; i++) { + builder.getTimeColumnBuilder().writeLong(i + (6 - index)); + for (int j = 0; j < 1; j++) { + if (isNull[index][i][j]) { + builder.getColumnBuilder(j).appendNull(); + } else { + builder.getColumnBuilder(j).writeFloat(value[index][i][j]); + } + } + builder.declarePosition(); + } + index++; + return builder.build(); + } + + @Override + public boolean hasNext() { + return index < 7; + } + + @Override + public boolean isFinished() { + return index >= 7; + } + }); + + int count = 0; + float[][][] res = + new float[][][] {{{0.0f}}, {{2.0f}}, {{3.0f}}, {{4.0f}}, {{0.0f}}, {{0.0f}}, {{0.0f}}}; + boolean[][][] isNull = + new boolean[][][] { + {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, {{true}} + }; + + boolean[] nullBlock = + new boolean[] {true, false, false, false, false, true, true, true, false, false, false}; + int nullBlockIndex = 0; + while (fillOperator.hasNext()) { + TsBlock block = fillOperator.next(); + assertEquals(nullBlock[nullBlockIndex++], block == null); + if (block == null) { + continue; + } + for (int i = 0; i < block.getPositionCount(); i++) { + long expectedTime = (block.getPositionCount() - i - 1) + (res.length - count - 1); + assertEquals(expectedTime, block.getTimeByIndex(i)); + for (int j = 0; j < 1; j++) { + assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i)); + if (!isNull[count][i][j]) { + assertEquals(res[count][i][j], block.getColumn(j).getFloat(i), 0.00001f); + } + } + } + count++; + } + + assertTrue(fillOperator.isFinished()); + assertEquals(res.length, count); + assertEquals(nullBlock.length, nullBlockIndex); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } }
