This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-3722 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e417bcc390506dd9009536a78acf3c463e1239ec Author: JackieTien97 <[email protected]> AuthorDate: Tue Jul 5 16:59:22 2022 +0800 [IOTDB-3722] Extend Fill function --- .../operator/process/LinearFillOperator.java | 6 +- .../operator/process/fill/ILinearFill.java | 54 +++++++++++ .../process/fill/identity/IdentityFill.java | 30 ++++++ .../process/fill/identity/IdentityLinearFill.java | 42 +++++++++ .../operator/process/fill/linear/LinearFill.java | 6 +- .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 31 ------- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 20 +++- .../execution/operator/LinearFillOperatorTest.java | 103 +++++++++++++++++++++ 8 files changed, 252 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java index 8193e20684..125cda1eae 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/LinearFillOperator.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill; +import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.Column; @@ -37,7 +37,7 @@ import static java.util.Objects.requireNonNull; public class LinearFillOperator implements ProcessOperator { private final OperatorContext operatorContext; - private final LinearFill[] fillArray; + private final ILinearFill[] fillArray; private final Operator child; private final int outputColumnCount; // TODO need to spill it to disk if it consumes too much memory @@ -52,7 +52,7 @@ public class LinearFillOperator implements ProcessOperator { private boolean noMoreTsBlock; public LinearFillOperator( - OperatorContext operatorContext, LinearFill[] fillArray, Operator child) { + OperatorContext operatorContext, ILinearFill[] fillArray, Operator child) { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); checkArgument( fillArray != null && fillArray.length > 0, "fillArray should not be null or empty"); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java new file mode 100644 index 0000000000..7bdd899ad3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/ILinearFill.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution.operator.process.fill; + +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; + +public interface ILinearFill { + + /** + * Before we call this method, we need to make sure the nextValue has been prepared or noMoreNext + * has been set to true + * + * @param timeColumn TimeColumn of valueColumn + * @param valueColumn valueColumn that need to be filled + * @return Value Column that has been filled + */ + Column fill(TimeColumn timeColumn, Column valueColumn); + + /** + * @param time end time of current valueColumn that need to be filled + * @param valueColumn valueColumn that need to be filled + * @return true if valueColumn can't be filled using current information, and we need to get next + * TsBlock and then call prepareForNext. false if valueColumn can be filled using current + * information, and we can directly call fill() function + */ + boolean needPrepareForNext(long time, Column valueColumn); + + /** + * @param time end time of current valueColumn that need to be filled + * @param nextTimeColumn TimeColumn of next TsBlock + * @param nextValueColumn Value Column of next TsBlock + * @return true if we get enough information to fill current column, and we can stop getting next + * TsBlock and calling prepareForNext. false if we still don't get enough information to fill + * current column, and still need to keep getting next TsBlock and then call prepareForNext + */ + boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn); +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java new file mode 100644 index 0000000000..25a5264bb7 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityFill.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution.operator.process.fill.identity; + +import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill; +import org.apache.iotdb.tsfile.read.common.block.column.Column; + +public class IdentityFill implements IFill { + + @Override + public Column fill(Column valueColumn) { + return valueColumn; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java new file mode 100644 index 0000000000..9b1cb93a5a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/identity/IdentityLinearFill.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.execution.operator.process.fill.identity; + +import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; + +public class IdentityLinearFill implements ILinearFill { + + @Override + public Column fill(TimeColumn timeColumn, Column valueColumn) { + return valueColumn; + } + + @Override + public boolean needPrepareForNext(long time, Column valueColumn) { + return false; + } + + @Override + public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) { + throw new IllegalArgumentException( + "We won't call prepareForNext in IdentityLinearFill, because needPrepareForNext() method will always return false."); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java index 92685b9c38..b813661058 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/fill/linear/LinearFill.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.execution.operator.process.fill.linear; +import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.merge.TimeComparator; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn; @@ -31,7 +32,7 @@ import static com.google.common.base.Preconditions.checkArgument; * the closest timestamp after T. Linear Fill function calculation only supports numeric types * including long, int, double and float. */ -public abstract class LinearFill { +public abstract class LinearFill implements ILinearFill { private final TimeComparator timeComparator; @@ -56,6 +57,7 @@ public abstract class LinearFill { * @param valueColumn valueColumn that need to be filled * @return Value Column that has been filled */ + @Override public Column fill(TimeColumn timeColumn, Column valueColumn) { int size = valueColumn.getPositionCount(); // if this valueColumn is empty, just return itself; @@ -119,6 +121,7 @@ public abstract class LinearFill { * TsBlock and then call prepareForNext. false if valueColumn can be filled using current * information, and we can directly call fill() function */ + @Override public boolean needPrepareForNext(long time, Column valueColumn) { return timeComparator.inFillBound(nextTime, time) && valueColumn.isNull(valueColumn.getPositionCount() - 1); @@ -132,6 +135,7 @@ public abstract class LinearFill { * TsBlock and calling prepareForNext. false if we still don't get enough information to fill * current column, and still need to keep getting next TsBlock and then call prepareForNext */ + @Override public boolean prepareForNext(long time, TimeColumn nextTimeColumn, Column nextValueColumn) { checkArgument( nextTimeColumn.getPositionCount() > 0 diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java index 0a4aeccfac..554b821e15 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java @@ -49,7 +49,6 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement; import org.apache.iotdb.db.mpp.plan.statement.StatementNode; import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor; import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent; -import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy; import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent; import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy; import org.apache.iotdb.db.mpp.plan.statement.component.ResultColumn; @@ -412,36 +411,6 @@ public class Analyzer { FillComponent fillComponent = queryStatement.getFillComponent(); List<Expression> fillColumnList = outputExpressions.stream().map(Pair::getLeft).distinct().collect(Collectors.toList()); - if (fillComponent.getFillPolicy() == FillPolicy.VALUE) { - for (Expression fillColumn : fillColumnList) { - TSDataType checkedDataType = typeProvider.getType(fillColumn.getExpressionString()); - if (!fillComponent.getFillValue().isDataTypeConsistency(checkedDataType)) { - throw new SemanticException( - String.format( - "Data type mismatch: column '%s' (dataType '%s') doesn't support fill with '%s' (dataType '%s').", - fillColumn.getExpressionString(), - checkedDataType, - fillComponent.getFillValue().getBinary(), - fillComponent.getFillValue().getDataTypeString())); - } - } - } else if (fillComponent.getFillPolicy() == FillPolicy.LINEAR) { - // TODO support linear fill in align by device query - if (queryStatement.isAlignByDevice()) { - throw new SemanticException( - "Linear fill is not supported in align by device query yet."); - } - - for (Expression fillColumn : fillColumnList) { - TSDataType checkedDataType = typeProvider.getType(fillColumn.getExpressionString()); - if (!checkedDataType.isNumeric()) { - throw new SemanticException( - String.format( - "Data type mismatch: column '%s' (dataType '%s') doesn't support linear fill.", - fillColumn.getExpressionString(), checkedDataType)); - } - } - } analysis.setFillDescriptor( new FillDescriptor(fillComponent.getFillPolicy(), fillComponent.getFillValue())); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index ce8afcf154..98b6569433 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -59,16 +59,18 @@ import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator; import org.apache.iotdb.db.mpp.execution.operator.process.UpdateLastCacheOperator; import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill; +import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BooleanConstantFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.DoubleConstantFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.FloatConstantFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.IntConstantFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.LongConstantFill; +import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityFill; +import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.DoubleLinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.IntLinearFill; -import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LongLinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BinaryPreviousFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.previous.BooleanPreviousFill; @@ -180,6 +182,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter; import static org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode; +import static org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.BOOLEAN; /** * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable @@ -197,6 +200,10 @@ public class LocalExecutionPlanner { private static final TimeComparator DESC_TIME_COMPARATOR = new DescTimeComparator(); + private static final IdentityFill IDENTITY_FILL = new IdentityFill(); + + private static final IdentityLinearFill IDENTITY_LINEAR_FILL = new IdentityLinearFill(); + public static LocalExecutionPlanner getInstance() { return InstanceHolder.INSTANCE; } @@ -676,6 +683,10 @@ public class LocalExecutionPlanner { int inputColumns, List<TSDataType> inputDataTypes, Literal literal) { IFill[] constantFill = new IFill[inputColumns]; for (int i = 0; i < inputColumns; i++) { + if (!literal.isDataTypeConsistency(inputDataTypes.get(i))) { + constantFill[i] = IDENTITY_FILL; + continue; + } switch (inputDataTypes.get(i)) { case BOOLEAN: constantFill[i] = new BooleanConstantFill(literal.getBoolean()); @@ -731,9 +742,9 @@ public class LocalExecutionPlanner { return previousFill; } - private LinearFill[] getLinearFill( + private ILinearFill[] getLinearFill( int inputColumns, List<TSDataType> inputDataTypes, boolean ascending) { - LinearFill[] linearFill = new LinearFill[inputColumns]; + ILinearFill[] linearFill = new ILinearFill[inputColumns]; TimeComparator timeComparator = ascending ? ASC_TIME_COMPARATOR : DESC_TIME_COMPARATOR; for (int i = 0; i < inputColumns; i++) { switch (inputDataTypes.get(i)) { @@ -751,8 +762,7 @@ public class LocalExecutionPlanner { break; case BOOLEAN: case TEXT: - throw new UnsupportedOperationException( - "DataType: " + inputDataTypes.get(i) + " doesn't support linear fill."); + linearFill[i] = IDENTITY_LINEAR_FILL; default: throw new IllegalArgumentException("Unknown data type: " + inputDataTypes.get(i)); } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java index 9852707056..23ca161fa2 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/LinearFillOperatorTest.java @@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; +import org.apache.iotdb.db.mpp.execution.operator.process.fill.identity.IdentityLinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.FloatLinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.linear.LinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; @@ -42,6 +44,7 @@ import java.util.concurrent.ExecutorService; import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class LinearFillOperatorTest { @@ -1057,4 +1060,104 @@ public class LinearFillOperatorTest { instanceNotificationExecutor.shutdown(); } } + + @Test + public void batchLinearFillBooleanTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId1, LinearFillOperator.class.getSimpleName()); + + ILinearFill[] fillArray = new ILinearFill[] {new IdentityLinearFill()}; + LinearFillOperator fillOperator = + new LinearFillOperator( + fragmentInstanceContext.getOperatorContexts().get(0), + fillArray, + new Operator() { + private int index = 0; + private final boolean[][][] value = + new boolean[][][] { + {{true}}, {{true}}, {{false}}, {{false}}, {{true}}, {{false}}, {{true}} + }; + final boolean[][][] isNull = + new boolean[][][] { + {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, {{true}} + }; + + @Override + public OperatorContext getOperatorContext() { + return null; + } + + @Override + public TsBlock next() { + TsBlockBuilder builder = new TsBlockBuilder(ImmutableList.of(TSDataType.BOOLEAN)); + for (int i = 0; i < 1; i++) { + builder.getTimeColumnBuilder().writeLong(i + index); + for (int j = 0; j < 1; j++) { + if (isNull[index][i][j]) { + builder.getColumnBuilder(j).appendNull(); + } else { + builder.getColumnBuilder(j).writeBoolean(value[index][i][j]); + } + } + builder.declarePosition(); + } + index++; + return builder.build(); + } + + @Override + public boolean hasNext() { + return index < 7; + } + + @Override + public boolean isFinished() { + return index >= 7; + } + }); + + int count = 0; + boolean[][][] res = + new boolean[][][] { + {{true}}, {{true}}, {{false}}, {{false}}, {{true}}, {{false}}, {{true}} + }; + boolean[][][] isNull = + new boolean[][][] { + {{true}}, {{false}}, {{false}}, {{false}}, {{true}}, {{true}}, {{true}} + }; + + while (fillOperator.hasNext()) { + TsBlock block = fillOperator.next(); + assertNotNull(block); + for (int i = 0; i < block.getPositionCount(); i++) { + long expectedTime = i + count; + assertEquals(expectedTime, block.getTimeByIndex(i)); + for (int j = 0; j < 1; j++) { + assertEquals(isNull[count][i][j], block.getColumn(j).isNull(i)); + if (!isNull[count][i][j]) { + assertEquals(res[count][i][j], block.getColumn(j).getBoolean(i)); + } + } + } + count++; + } + + assertTrue(fillOperator.isFinished()); + assertEquals(res.length, count); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } }
