This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bfad22909929b3480d473b41f3c2d543b4eaab6d Author: Minghui Liu <[email protected]> AuthorDate: Sun Oct 16 11:13:50 2022 +0800 add IntoOperator --- .../execution/operator/process/IntoOperator.java | 309 +++++++++++++++++++++ .../process/RawDataAggregationOperator.java | 2 +- .../process/SingleInputAggregationOperator.java | 2 - .../process/SlidingWindowAggregationOperator.java | 2 +- 4 files changed, 311 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java new file mode 100644 index 0000000000..c5ca4cc091 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.execution.operator.process; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.execution.operator.Operator; +import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.BitMap; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class IntoOperator implements ProcessOperator { + + private final OperatorContext operatorContext; + private final Operator child; + + private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators; + + public IntoOperator( + OperatorContext operatorContext, + Operator child, + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap, + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, + Map<PartialPath, Boolean> targetDeviceToAlignedMap) { + this.operatorContext = operatorContext; + this.child = child; + this.insertTabletStatementGenerators = + constructInsertTabletStatementGenerators( + targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); + } + + private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators( + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap, + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, + Map<PartialPath, Boolean> targetDeviceToAlignedMap) { + List<InsertTabletStatementGenerator> insertTabletStatementGenerators = + new ArrayList<>(targetPathToSourceMap.size()); + for (PartialPath targetDevice : targetPathToSourceMap.keySet()) { + InsertTabletStatementGenerator generator = + new InsertTabletStatementGenerator( + targetDevice, + targetPathToSourceMap.get(targetDevice), + targetPathToDataTypeMap.get(targetDevice), + targetDeviceToAlignedMap.get(targetDevice)); + insertTabletStatementGenerators.add(generator); + } + return insertTabletStatementGenerators; + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + return child.isBlocked(); + } + + @Override + public TsBlock next() { + TsBlock inputTsBlock = child.next(); + int lastReadIndex = 0; + while (lastReadIndex < inputTsBlock.getPositionCount()) { + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex); + } + insertMultiTabletsInternally(true); + } + + if (child.hasNext()) { + return null; + } else { + insertMultiTabletsInternally(false); + return constructResultTsBlock(); + } + } + + private void insertMultiTabletsInternally(boolean needCheck) { + if ((needCheck && !insertTabletStatementGenerators.get(0).isFull()) + || insertTabletStatementGenerators.get(0).isEmpty()) { + return; + } + + List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + insertTabletStatementList.add(generator.constructInsertTabletStatement()); + } + + InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement(); + insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); + // TODO: execute insertMultiTabletsStatement + + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + generator.reset(); + } + } + + private TsBlock constructResultTsBlock() { + List<TSDataType> dataTypes = new ArrayList<>(); + TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(dataTypes); + return resultTsBlockBuilder.build(); + } + + @Override + public boolean hasNext() { + return child.hasNext(); + } + + @Override + public void close() throws Exception { + child.close(); + } + + @Override + public boolean isFinished() { + return child.isFinished(); + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + private static class InsertTabletStatementGenerator { + + private final int TABLET_ROW_LIMIT = + IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit(); + + private final PartialPath devicePath; + private final boolean isAligned; + private final String[] measurements; + private final TSDataType[] dataTypes; + private final InputLocation[] inputLocations; + + private int rowCount = 0; + + private long[] times; + private Object[] columns; + private BitMap[] bitMaps; + + public InsertTabletStatementGenerator( + PartialPath devicePath, + Map<String, InputLocation> measurementToInputLocationMap, + Map<String, TSDataType> measurementToDataTypeMap, + Boolean isAligned) { + this.devicePath = devicePath; + this.isAligned = isAligned; + this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]); + this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]); + this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]); + } + + public void reset() { + this.rowCount = 0; + this.times = new long[TABLET_ROW_LIMIT]; + this.columns = new Object[this.measurements.length]; + this.bitMaps = new BitMap[this.measurements.length]; + for (int i = 0; i < this.bitMaps.length; ++i) { + this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT); + this.bitMaps[i].markAll(); + } + } + + public int processTsBlock(TsBlock tsBlock, int lastReadIndex) { + for (; lastReadIndex < tsBlock.getPositionCount(); lastReadIndex++) { + + times[rowCount] = tsBlock.getTimeByIndex(lastReadIndex); + + for (int i = 0; i < measurements.length; ++i) { + Column valueColumn = tsBlock.getValueColumns()[inputLocations[i].getValueColumnIndex()]; + + // if the value is NULL + if (valueColumn.isNull(lastReadIndex)) { + // bit in bitMaps are marked as 1 (NULL) by default + continue; + } + + bitMaps[i].unmark(rowCount); + switch (valueColumn.getDataType()) { + case INT32: + ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex); + break; + case INT64: + ((long[]) columns[i])[rowCount] = valueColumn.getLong(lastReadIndex); + break; + case FLOAT: + ((float[]) columns[i])[rowCount] = valueColumn.getFloat(lastReadIndex); + break; + case DOUBLE: + ((double[]) columns[i])[rowCount] = valueColumn.getDouble(lastReadIndex); + break; + case BOOLEAN: + ((boolean[]) columns[i])[rowCount] = valueColumn.getBoolean(lastReadIndex); + break; + case TEXT: + ((Binary[]) columns[i])[rowCount] = valueColumn.getBinary(lastReadIndex); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "data type %s is not supported when convert data at client", + valueColumn.getDataType())); + } + } + + ++rowCount; + if (rowCount == TABLET_ROW_LIMIT) { + break; + } + } + return lastReadIndex; + } + + public boolean isFull() { + return rowCount == TABLET_ROW_LIMIT; + } + + public boolean isEmpty() { + return rowCount == 0; + } + + public InsertTabletStatement constructInsertTabletStatement() { + InsertTabletStatement insertTabletStatement = new InsertTabletStatement(); + insertTabletStatement.setDevicePath(devicePath); + insertTabletStatement.setAligned(isAligned); + insertTabletStatement.setMeasurements(measurements); + insertTabletStatement.setDataTypes(dataTypes); + insertTabletStatement.setRowCount(rowCount); + + if (rowCount != TABLET_ROW_LIMIT) { + times = Arrays.copyOf(times, rowCount); + for (int i = 0; i < columns.length; i++) { + switch (dataTypes[i]) { + case BOOLEAN: + columns[i] = Arrays.copyOf((boolean[]) columns[i], rowCount); + break; + case INT32: + columns[i] = Arrays.copyOf((int[]) columns[i], rowCount); + break; + case INT64: + columns[i] = Arrays.copyOf((long[]) columns[i], rowCount); + break; + case FLOAT: + columns[i] = Arrays.copyOf((float[]) columns[i], rowCount); + break; + case DOUBLE: + columns[i] = Arrays.copyOf((double[]) columns[i], rowCount); + break; + case TEXT: + columns[i] = Arrays.copyOf((Binary[]) columns[i], rowCount); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataTypes[i])); + } + } + } + + insertTabletStatement.setTimes(times); + insertTabletStatement.setColumns(columns); + insertTabletStatement.setBitMaps(bitMaps); + + return insertTabletStatement; + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java index e23a1232c7..30488c18d3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java @@ -53,7 +53,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator { Operator child, boolean ascending, long maxReturnSize) { - super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize); + super(operatorContext, aggregators, child, ascending, maxReturnSize); this.windowManager = new TimeWindowManager(timeRangeIterator); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java index 676ccd823e..16071aea1e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.mpp.aggregation.Aggregator; -import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -56,7 +55,6 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator List<Aggregator> aggregators, Operator child, boolean ascending, - ITimeRangeIterator timeRangeIterator, long maxReturnSize) { this.operatorContext = operatorContext; this.ascending = ascending; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java index a1fc271b1a..76499849cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java @@ -50,7 +50,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper boolean ascending, GroupByTimeParameter groupByTimeParameter, long maxReturnSize) { - super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize); + super(operatorContext, aggregators, child, ascending, maxReturnSize); checkArgument( groupByTimeParameter != null, "GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");
