This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de98a1a31da1e5b20a458fa5b6602108be374aed Author: Minghui Liu <[email protected]> AuthorDate: Tue Oct 18 16:57:37 2022 +0800 implement DeviceViewIntoOperator --- ...IntoOperator.java => AbstractIntoOperator.java} | 115 +++----- .../operator/process/DeviceViewIntoOperator.java | 128 ++++++--- .../execution/operator/process/IntoOperator.java | 302 +-------------------- 3 files changed, 126 insertions(+), 419 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java similarity index 74% copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index 1491623e77..d038874d0b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.mpp.common.header.ColumnHeader; -import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; @@ -31,13 +29,9 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.Column; -import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; -import org.apache.iotdb.tsfile.utils.Pair; import com.google.common.util.concurrent.ListenableFuture; @@ -47,43 +41,37 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -public class IntoOperator implements ProcessOperator { +public abstract class AbstractIntoOperator implements ProcessOperator { - private final OperatorContext operatorContext; - private final Operator child; + protected final OperatorContext operatorContext; + protected final Operator child; - private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators; - private final List<Pair<String, PartialPath>> sourceTargetPathPairList; - private final Map<String, InputLocation> sourceColumnToInputLocationMap; + protected List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators; - public IntoOperator( + protected final Map<String, InputLocation> sourceColumnToInputLocationMap; + + public AbstractIntoOperator( OperatorContext operatorContext, Operator child, - Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap, - Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, - Map<String, Boolean> targetDeviceToAlignedMap, - List<Pair<String, PartialPath>> sourceTargetPathPairList, + List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators, Map<String, InputLocation> sourceColumnToInputLocationMap) { this.operatorContext = operatorContext; this.child = child; - this.insertTabletStatementGenerators = - constructInsertTabletStatementGenerators( - targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); - this.sourceTargetPathPairList = sourceTargetPathPairList; + this.insertTabletStatementGenerators = insertTabletStatementGenerators; this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap; } - private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators( - Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap, - Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, - Map<String, Boolean> targetDeviceToAlignedMap) { - List<InsertTabletStatementGenerator> insertTabletStatementGenerators = + protected static List<IntoOperator.InsertTabletStatementGenerator> + constructInsertTabletStatementGenerators( + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap, + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, + Map<String, Boolean> targetDeviceToAlignedMap) { + List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators = new ArrayList<>(targetPathToSourceInputLocationMap.size()); for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) { - InsertTabletStatementGenerator generator = - new InsertTabletStatementGenerator( + IntoOperator.InsertTabletStatementGenerator generator = + new IntoOperator.InsertTabletStatementGenerator( targetDevice, targetPathToSourceInputLocationMap.get(targetDevice), targetPathToDataTypeMap.get(targetDevice), @@ -93,45 +81,14 @@ public class IntoOperator implements ProcessOperator { return insertTabletStatementGenerators; } - @Override - public OperatorContext getOperatorContext() { - return operatorContext; - } - - @Override - public ListenableFuture<?> isBlocked() { - return child.isBlocked(); - } - - @Override - public TsBlock next() { - TsBlock inputTsBlock = child.next(); - if (inputTsBlock != null) { - int lastReadIndex = 0; - while (lastReadIndex < inputTsBlock.getPositionCount()) { - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex); - } - insertMultiTabletsInternally(true); - } - } - - if (child.hasNext()) { - return null; - } else { - insertMultiTabletsInternally(false); - return constructResultTsBlock(); - } - } - - private void insertMultiTabletsInternally(boolean needCheck) { + protected void insertMultiTabletsInternally(boolean needCheck) { if ((needCheck && !insertTabletStatementGenerators.get(0).isFull()) || insertTabletStatementGenerators.get(0).isEmpty()) { return; } List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { insertTabletStatementList.add(generator.constructInsertTabletStatement()); } @@ -139,32 +96,14 @@ public class IntoOperator implements ProcessOperator { insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); // TODO: execute insertMultiTabletsStatement - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { generator.reset(); } } - private TsBlock constructResultTsBlock() { - List<TSDataType> outputDataTypes = - ColumnHeaderConstant.selectIntoColumnHeaders.stream() - .map(ColumnHeader::getColumnType) - .collect(Collectors.toList()); - TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); - TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); - ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) { - timeColumnBuilder.writeLong(0); - columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left)); - columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right.toString())); - columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left)); - resultTsBlockBuilder.declarePosition(); - } - return resultTsBlockBuilder.build(); - } - - private int findWritten(String sourceColumn) { + protected int findWritten(String sourceColumn) { InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn); - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { + for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { int written = generator.getWrittenCount(inputLocation); if (written != -1) { return written; @@ -173,6 +112,16 @@ public class IntoOperator implements ProcessOperator { return 0; } + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + return child.isBlocked(); + } + @Override public boolean hasNext() { return child.hasNext(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index e1c38e662c..bf1a0920fa 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -19,64 +19,112 @@ package org.apache.iotdb.db.mpp.execution.operator.process; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.mpp.common.header.ColumnHeader; +import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.Pair; -import com.google.common.util.concurrent.ListenableFuture; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; -public class DeviceViewIntoOperator implements ProcessOperator { +public class DeviceViewIntoOperator extends AbstractIntoOperator { - private final OperatorContext operatorContext; - private final Operator child; + private final Map<String, Map<PartialPath, Map<String, InputLocation>>> + deviceToTargetPathSourceInputLocationMap; + private final Map<String, Map<PartialPath, Map<String, TSDataType>>> + deviceToTargetPathDataTypeMap; + private final Map<String, Boolean> targetDeviceToAlignedMap; + private final Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap; - public DeviceViewIntoOperator(OperatorContext operatorContext, Operator child) { - this.operatorContext = operatorContext; - this.child = child; - } + private String currentDevice; - @Override - public OperatorContext getOperatorContext() { - return operatorContext; - } + private final TsBlockBuilder resultTsBlockBuilder; - @Override - public ListenableFuture<?> isBlocked() { - return child.isBlocked(); - } + public DeviceViewIntoOperator( + OperatorContext operatorContext, + Operator child, + Map<String, Map<PartialPath, Map<String, InputLocation>>> + deviceToTargetPathSourceInputLocationMap, + Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap, + Map<String, Boolean> targetDeviceToAlignedMap, + Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap, + Map<String, InputLocation> sourceColumnToInputLocationMap) { + super(operatorContext, child, null, sourceColumnToInputLocationMap); + this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap; + this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap; + this.targetDeviceToAlignedMap = targetDeviceToAlignedMap; + this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap; - @Override - public TsBlock next() { - return null; + List<TSDataType> outputDataTypes = + ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream() + .map(ColumnHeader::getColumnType) + .collect(Collectors.toList()); + this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); } @Override - public boolean hasNext() { - return child.hasNext(); - } - - @Override - public void close() throws Exception { - child.close(); - } + public TsBlock next() { + TsBlock inputTsBlock = child.next(); + if (inputTsBlock != null) { + String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0)); + if (!Objects.equals(device, currentDevice)) { + insertMultiTabletsInternally(false); + updateResultTsBlock(); - @Override - public boolean isFinished() { - return child.isFinished(); - } + insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device); + currentDevice = device; + } + int lastReadIndex = 0; + while (lastReadIndex < inputTsBlock.getPositionCount()) { + for (IntoOperator.InsertTabletStatementGenerator generator : + insertTabletStatementGenerators) { + lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex); + } + insertMultiTabletsInternally(true); + } + } - @Override - public long calculateMaxPeekMemory() { - return child.calculateMaxPeekMemory(); + if (child.hasNext()) { + return null; + } else { + insertMultiTabletsInternally(false); + updateResultTsBlock(); + return resultTsBlockBuilder.build(); + } } - @Override - public long calculateMaxReturnSize() { - return child.calculateMaxReturnSize(); + private void updateResultTsBlock() { + TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); + ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); + for (Pair<String, PartialPath> sourceTargetPathPair : + deviceToSourceTargetPathPairListMap.get(currentDevice)) { + timeColumnBuilder.writeLong(0); + columnBuilders[0].writeBinary(new Binary(currentDevice)); + columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.left)); + columnBuilders[2].writeBinary(new Binary(sourceTargetPathPair.right.toString())); + columnBuilders[3].writeInt(findWritten(sourceTargetPathPair.left)); + resultTsBlockBuilder.declarePosition(); + } } - @Override - public long calculateRetainedSizeAfterCallingNext() { - return child.calculateRetainedSizeAfterCallingNext(); + private List<IntoOperator.InsertTabletStatementGenerator> + constructInsertTabletStatementGeneratorsByDevice(String currentDevice) { + Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap = + deviceToTargetPathSourceInputLocationMap.get(currentDevice); + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap = + deviceToTargetPathDataTypeMap.get(currentDevice); + return constructInsertTabletStatementGenerators( + targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 1491623e77..6a6c2045d3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -20,43 +20,26 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.mpp.common.header.ColumnHeader; import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; -import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; -import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; -import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.Pair; -import com.google.common.util.concurrent.ListenableFuture; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -public class IntoOperator implements ProcessOperator { - - private final OperatorContext operatorContext; - private final Operator child; +public class IntoOperator extends AbstractIntoOperator { - private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators; private final List<Pair<String, PartialPath>> sourceTargetPathPairList; - private final Map<String, InputLocation> sourceColumnToInputLocationMap; public IntoOperator( OperatorContext operatorContext, @@ -66,41 +49,13 @@ public class IntoOperator implements ProcessOperator { Map<String, Boolean> targetDeviceToAlignedMap, List<Pair<String, PartialPath>> sourceTargetPathPairList, Map<String, InputLocation> sourceColumnToInputLocationMap) { - this.operatorContext = operatorContext; - this.child = child; - this.insertTabletStatementGenerators = + super( + operatorContext, + child, constructInsertTabletStatementGenerators( - targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap); + targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap), + sourceColumnToInputLocationMap); this.sourceTargetPathPairList = sourceTargetPathPairList; - this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap; - } - - private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators( - Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap, - Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap, - Map<String, Boolean> targetDeviceToAlignedMap) { - List<InsertTabletStatementGenerator> insertTabletStatementGenerators = - new ArrayList<>(targetPathToSourceInputLocationMap.size()); - for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) { - InsertTabletStatementGenerator generator = - new InsertTabletStatementGenerator( - targetDevice, - targetPathToSourceInputLocationMap.get(targetDevice), - targetPathToDataTypeMap.get(targetDevice), - targetDeviceToAlignedMap.get(targetDevice.toString())); - insertTabletStatementGenerators.add(generator); - } - return insertTabletStatementGenerators; - } - - @Override - public OperatorContext getOperatorContext() { - return operatorContext; - } - - @Override - public ListenableFuture<?> isBlocked() { - return child.isBlocked(); } @Override @@ -124,26 +79,6 @@ public class IntoOperator implements ProcessOperator { } } - private void insertMultiTabletsInternally(boolean needCheck) { - if ((needCheck && !insertTabletStatementGenerators.get(0).isFull()) - || insertTabletStatementGenerators.get(0).isEmpty()) { - return; - } - - List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - insertTabletStatementList.add(generator.constructInsertTabletStatement()); - } - - InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement(); - insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); - // TODO: execute insertMultiTabletsStatement - - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - generator.reset(); - } - } - private TsBlock constructResultTsBlock() { List<TSDataType> outputDataTypes = ColumnHeaderConstant.selectIntoColumnHeaders.stream() @@ -161,229 +96,4 @@ public class IntoOperator implements ProcessOperator { } return resultTsBlockBuilder.build(); } - - private int findWritten(String sourceColumn) { - InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn); - for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { - int written = generator.getWrittenCount(inputLocation); - if (written != -1) { - return written; - } - } - return 0; - } - - @Override - public boolean hasNext() { - return child.hasNext(); - } - - @Override - public void close() throws Exception { - child.close(); - } - - @Override - public boolean isFinished() { - return child.isFinished(); - } - - @Override - public long calculateMaxPeekMemory() { - return child.calculateMaxPeekMemory(); - } - - @Override - public long calculateMaxReturnSize() { - return child.calculateMaxReturnSize(); - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return child.calculateRetainedSizeAfterCallingNext(); - } - - public static class InsertTabletStatementGenerator { - - private final int TABLET_ROW_LIMIT = - IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit(); - - private final PartialPath devicePath; - private final boolean isAligned; - private final String[] measurements; - private final TSDataType[] dataTypes; - private final InputLocation[] inputLocations; - - private int rowCount = 0; - - private long[] times; - private Object[] columns; - private BitMap[] bitMaps; - - private final Map<InputLocation, AtomicInteger> writtenCounter; - - public InsertTabletStatementGenerator( - PartialPath devicePath, - Map<String, InputLocation> measurementToInputLocationMap, - Map<String, TSDataType> measurementToDataTypeMap, - Boolean isAligned) { - this.devicePath = devicePath; - this.isAligned = isAligned; - this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]); - this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]); - this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]); - this.writtenCounter = new HashMap<>(); - for (InputLocation inputLocation : inputLocations) { - writtenCounter.put(inputLocation, new AtomicInteger(0)); - } - this.reset(); - } - - public void reset() { - this.rowCount = 0; - this.times = new long[TABLET_ROW_LIMIT]; - this.columns = new Object[this.measurements.length]; - for (int i = 0; i < this.measurements.length; i++) { - switch (dataTypes[i]) { - case BOOLEAN: - columns[i] = new boolean[TABLET_ROW_LIMIT]; - break; - case INT32: - columns[i] = new int[TABLET_ROW_LIMIT]; - break; - case INT64: - columns[i] = new long[TABLET_ROW_LIMIT]; - break; - case FLOAT: - columns[i] = new float[TABLET_ROW_LIMIT]; - break; - case DOUBLE: - columns[i] = new double[TABLET_ROW_LIMIT]; - break; - case TEXT: - columns[i] = new Binary[TABLET_ROW_LIMIT]; - Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE); - break; - default: - throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", dataTypes[i])); - } - } - this.bitMaps = new BitMap[this.measurements.length]; - for (int i = 0; i < this.bitMaps.length; ++i) { - this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT); - this.bitMaps[i].markAll(); - } - } - - public int processTsBlock(TsBlock tsBlock, int lastReadIndex) { - for (; lastReadIndex < tsBlock.getPositionCount(); lastReadIndex++) { - - times[rowCount] = tsBlock.getTimeByIndex(lastReadIndex); - - for (int i = 0; i < measurements.length; ++i) { - Column valueColumn = tsBlock.getValueColumns()[inputLocations[i].getValueColumnIndex()]; - - // if the value is NULL - if (valueColumn.isNull(lastReadIndex)) { - // bit in bitMaps are marked as 1 (NULL) by default - continue; - } - - bitMaps[i].unmark(rowCount); - writtenCounter.get(inputLocations[i]).getAndIncrement(); - switch (valueColumn.getDataType()) { - case INT32: - ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex); - break; - case INT64: - ((long[]) columns[i])[rowCount] = valueColumn.getLong(lastReadIndex); - break; - case FLOAT: - ((float[]) columns[i])[rowCount] = valueColumn.getFloat(lastReadIndex); - break; - case DOUBLE: - ((double[]) columns[i])[rowCount] = valueColumn.getDouble(lastReadIndex); - break; - case BOOLEAN: - ((boolean[]) columns[i])[rowCount] = valueColumn.getBoolean(lastReadIndex); - break; - case TEXT: - ((Binary[]) columns[i])[rowCount] = valueColumn.getBinary(lastReadIndex); - break; - default: - throw new UnSupportedDataTypeException( - String.format( - "data type %s is not supported when convert data at client", - valueColumn.getDataType())); - } - } - - ++rowCount; - if (rowCount == TABLET_ROW_LIMIT) { - break; - } - } - return lastReadIndex; - } - - public boolean isFull() { - return rowCount == TABLET_ROW_LIMIT; - } - - public boolean isEmpty() { - return rowCount == 0; - } - - public InsertTabletStatement constructInsertTabletStatement() { - InsertTabletStatement insertTabletStatement = new InsertTabletStatement(); - insertTabletStatement.setDevicePath(devicePath); - insertTabletStatement.setAligned(isAligned); - insertTabletStatement.setMeasurements(measurements); - insertTabletStatement.setDataTypes(dataTypes); - insertTabletStatement.setRowCount(rowCount); - - if (rowCount != TABLET_ROW_LIMIT) { - times = Arrays.copyOf(times, rowCount); - for (int i = 0; i < columns.length; i++) { - switch (dataTypes[i]) { - case BOOLEAN: - columns[i] = Arrays.copyOf((boolean[]) columns[i], rowCount); - break; - case INT32: - columns[i] = Arrays.copyOf((int[]) columns[i], rowCount); - break; - case INT64: - columns[i] = Arrays.copyOf((long[]) columns[i], rowCount); - break; - case FLOAT: - columns[i] = Arrays.copyOf((float[]) columns[i], rowCount); - break; - case DOUBLE: - columns[i] = Arrays.copyOf((double[]) columns[i], rowCount); - break; - case TEXT: - columns[i] = Arrays.copyOf((Binary[]) columns[i], rowCount); - break; - default: - throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", dataTypes[i])); - } - } - } - - insertTabletStatement.setTimes(times); - insertTabletStatement.setColumns(columns); - insertTabletStatement.setBitMaps(bitMaps); - - return insertTabletStatement; - } - - public int getWrittenCount(InputLocation inputLocation) { - if (!writtenCounter.containsKey(inputLocation)) { - return -1; - } - return writtenCounter.get(inputLocation).get(); - } - } }
