This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2293ea4ca46e4ec95ff65f7950f69b1b5ca0429c Author: Minghui Liu <[email protected]> AuthorDate: Sun Oct 16 19:01:35 2022 +0800 finish IntoOperator v1 --- .../execution/operator/process/IntoOperator.java | 65 +++++++++++++++++++--- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index c5ca4cc091..1251bb3e50 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.common.header.ColumnHeader; +import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; @@ -31,15 +33,21 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class IntoOperator implements ProcessOperator { @@ -47,18 +55,24 @@ public class IntoOperator implements ProcessOperator { private final Operator child; private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators; + private final List<Pair<String, String>> sourceTargetPathPairList; + private final Map<String, InputLocation> sourceColumnToInputLocationMap; public IntoOperator( OperatorContext operatorContext, Operator child, Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap, Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, - Map<PartialPath, Boolean> targetDeviceToAlignedMap) { + Map<PartialPath, Boolean> targetDeviceToAlignedMap, + List<Pair<String, String>> sourceTargetPathPairList, + Map<String, InputLocation> sourceColumnToInputLocationMap) { this.operatorContext = operatorContext; this.child = child; this.insertTabletStatementGenerators = constructInsertTabletStatementGenerators( targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); + this.sourceTargetPathPairList = sourceTargetPathPairList; + this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap; } private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators( @@ -129,11 +143,34 @@ public class IntoOperator implements ProcessOperator { } private TsBlock constructResultTsBlock() { - List<TSDataType> dataTypes = new ArrayList<>(); - TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(dataTypes); + List<TSDataType> outputDataTypes = + ColumnHeaderConstant.selectIntoColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList()); + TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); + TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); + ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); + for (Pair<String, String> sourceTargetPathPair : sourceTargetPathPairList) { + timeColumnBuilder.writeLong(0); + columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left)); + columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right)); + columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left)); + resultTsBlockBuilder.declarePosition(); + } return resultTsBlockBuilder.build(); } + private int findWritten(String sourceColumn) { + InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn); + for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + int written = generator.getWrittenCount(inputLocation); + if (written != -1) { + return written; + } + } + return 0; + } + @Override public boolean hasNext() { return child.hasNext(); @@ -151,20 +188,20 @@ public class IntoOperator implements ProcessOperator { @Override public long calculateMaxPeekMemory() { - return 0; + return child.calculateMaxPeekMemory(); } @Override public long calculateMaxReturnSize() { - return 0; + return child.calculateMaxReturnSize(); } @Override public long calculateRetainedSizeAfterCallingNext() { - return 0; + return child.calculateRetainedSizeAfterCallingNext(); } - private static class InsertTabletStatementGenerator { + public static class InsertTabletStatementGenerator { private final int TABLET_ROW_LIMIT = IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit(); @@ -181,6 +218,8 @@ public class IntoOperator implements ProcessOperator { private Object[] columns; private BitMap[] bitMaps; + private final Map<InputLocation, AtomicInteger> writtenCounter; + public InsertTabletStatementGenerator( PartialPath devicePath, Map<String, InputLocation> measurementToInputLocationMap, @@ -191,6 +230,10 @@ public class IntoOperator implements ProcessOperator { this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]); this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]); this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]); + this.writtenCounter = new HashMap<>(); + for (InputLocation inputLocation : inputLocations) { + writtenCounter.put(inputLocation, new AtomicInteger(0)); + } } public void reset() { @@ -219,6 +262,7 @@ public class IntoOperator implements ProcessOperator { } bitMaps[i].unmark(rowCount); + writtenCounter.get(inputLocations[i]).getAndIncrement(); switch (valueColumn.getDataType()) { case INT32: ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex); @@ -305,5 +349,12 @@ public class IntoOperator implements ProcessOperator { return insertTabletStatement; } + + public int getWrittenCount(InputLocation inputLocation) { + if (!writtenCounter.containsKey(inputLocation)) { + return -1; + } + return writtenCounter.get(inputLocation).get(); + } } }
