This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-tablet-covert in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c449f57bc0010229dd7a214e51600b3e5536c0f0 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Jun 5 20:11:30 2023 +0800 fix: insertRecordNode column convertion bug --- .../event/dml/insertion/TabletInsertionEvent.java | 12 +- .../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 19 +-- .../event/impl/PipeEmptyTabletInsertionEvent.java | 39 ----- .../impl/PipeInsertNodeTabletInsertionEvent.java | 16 +- ...Event.java => PipeRawTabletInsertionEvent.java} | 20 ++- .../db/pipe/core/event/view/access/PipeRow.java | 12 +- .../event/view/collector/PipeRowCollector.java | 23 +-- .../TabletInsertionDataContainer.java | 176 +++++++++------------ .../TsFileInsertionDataContainer.java | 4 +- .../core/processor/PipeDoNothingProcessor.java | 14 +- 10 files changed, 138 insertions(+), 197 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java index 4a8073a5a26..09b129a9cc4 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java @@ -32,16 +32,16 @@ public interface TabletInsertionEvent extends Event { /** * The consumer processes the data row by row and collects the results by RowCollector. * - * @return TabletInsertionEvent a new TabletInsertionEvent contains the results collected by the - * RowCollector + * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent contains the results + * collected by the RowCollector */ - TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer); + Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer); /** * The consumer processes the Tablet directly and collects the results by RowCollector. * - * @return TabletInsertionEvent a new TabletInsertionEvent contains the results collected by the - * RowCollector + * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent contains the results + * collected by the RowCollector */ - TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer); + Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java index c67e3966cc2..1636d0fdc02 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java @@ -33,9 +33,8 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq; import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq; import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq; -import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent; import org.apache.iotdb.db.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; @@ -120,10 +119,8 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { try { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent); - } else if (tabletInsertionEvent instanceof PipeTabletTabletInsertionEvent) { - doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent); - } else if (tabletInsertionEvent instanceof PipeEmptyTabletInsertionEvent) { - doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent); + } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) { + doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent); } else { throw new NotImplementedException( "IoTDBThriftConnectorV1 only support PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent."); @@ -154,25 +151,21 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { } } - private void doTransfer(PipeTabletTabletInsertionEvent pipeTabletTabletInsertionEvent) + private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, TException, IOException { final TPipeTransferResp resp = client.pipeTransfer( PipeTransferTabletReq.toTPipeTransferReq( - pipeTabletTabletInsertionEvent.convertToTablet())); + pipeRawTabletInsertionEvent.convertToTablet())); if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new PipeException( String.format( "Transfer PipeTabletTabletInsertionEvent %s error, result status %s", - pipeTabletTabletInsertionEvent, resp.status)); + pipeRawTabletInsertionEvent, resp.status)); } } - private void doTransfer(PipeEmptyTabletInsertionEvent pipeEmptyTabletInsertionEvent) { - // do nothing - } - @Override public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java deleted file mode 100644 index 855da8fa8b0..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.core.event.impl; - -import org.apache.iotdb.pipe.api.access.Row; -import org.apache.iotdb.pipe.api.collector.RowCollector; -import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.tsfile.write.record.Tablet; - -import java.util.function.BiConsumer; - -public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent { - @Override - public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { - return this; - } - - @Override - public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { - return this; - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java index 416fea8b734..d9a895acbae 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java @@ -116,7 +116,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent /////////////////////////// TabletInsertionEvent /////////////////////////// @Override - public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) { try { if (dataContainer == null) { dataContainer = new TabletInsertionDataContainer(getInsertNode(), getPattern()); @@ -129,7 +129,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } @Override - public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) { try { if (dataContainer == null) { dataContainer = new TabletInsertionDataContainer(getInsertNode(), getPattern()); @@ -141,18 +141,6 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } } - public Tablet convertToTablet() { - try { - if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(getInsertNode(), getPattern()); - } - return dataContainer.convertToTablet(); - } catch (Exception e) { - LOGGER.error("Process tablet error.", e); - throw new PipeException("Process tablet error.", e); - } - } - @TestOnly public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) { try { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java similarity index 74% rename from server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java rename to server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java index 014972abd7f..4343a18aa5d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java @@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet; import java.util.Objects; import java.util.function.BiConsumer; -public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { +public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { private final Tablet tablet; private final String pattern; private TabletInsertionDataContainer dataContainer; - public PipeTabletTabletInsertionEvent(Tablet tablet) { + public PipeRawTabletInsertionEvent(Tablet tablet) { this(Objects.requireNonNull(tablet), null); } - public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) { + private PipeRawTabletInsertionEvent(Tablet tablet, String pattern) { this.tablet = Objects.requireNonNull(tablet); this.pattern = pattern; } @@ -52,7 +52,7 @@ public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { /////////////////////////// TabletInsertionEvent /////////////////////////// @Override - public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) { if (dataContainer == null) { dataContainer = new TabletInsertionDataContainer(tablet, getPattern()); } @@ -60,7 +60,7 @@ public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { } @Override - public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) { if (dataContainer == null) { dataContainer = new TabletInsertionDataContainer(tablet, getPattern()); } @@ -68,8 +68,16 @@ public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { } public Tablet convertToTablet() { + final String pattern = getPattern(); + + // if pattern is "root", we don't need to convert, just return the original tablet + if (pattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { + return tablet; + } + + // if pattern is not "root", we need to convert the tablet if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(tablet, getPattern()); + dataContainer = new TabletInsertionDataContainer(tablet, pattern); } return dataContainer.convertToTablet(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java index dfab1e1aa2b..4a8bd65bd30 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java @@ -26,6 +26,7 @@ import org.apache.iotdb.pipe.api.type.Binary; import org.apache.iotdb.pipe.api.type.Type; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.util.Arrays; @@ -39,8 +40,9 @@ public class PipeRow implements Row { private final MeasurementSchema[] measurementSchemaList; private final long[] timestampColumn; - private final Object[] valueColumns; private final TSDataType[] valueColumnTypes; + private final Object[] valueColumns; + private final BitMap[] bitMaps; private final String[] columnNameStringList; @@ -49,15 +51,17 @@ public class PipeRow implements Row { String deviceId, MeasurementSchema[] measurementSchemaList, long[] timestampColumn, - Object[] valueColumns, TSDataType[] valueColumnTypes, + Object[] valueColumns, + BitMap[] bitMaps, String[] columnNameStringList) { this.rowIndex = rowIndex; this.deviceId = deviceId; this.measurementSchemaList = measurementSchemaList; this.timestampColumn = timestampColumn; - this.valueColumns = valueColumns; this.valueColumnTypes = valueColumnTypes; + this.valueColumns = valueColumns; + this.bitMaps = bitMaps; this.columnNameStringList = columnNameStringList; } @@ -113,7 +117,7 @@ public class PipeRow implements Row { @Override public boolean isNull(int columnIndex) { - return ((Object[]) valueColumns[columnIndex])[rowIndex] == null; + return bitMaps[columnIndex].isMarked(rowIndex); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java index ab0371252f3..8010b67e57e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java @@ -19,8 +19,7 @@ package org.apache.iotdb.db.pipe.core.event.view.collector; -import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; @@ -35,6 +34,7 @@ import java.util.List; public class PipeRowCollector implements RowCollector { + private final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>(); private Tablet tablet = null; @Override @@ -63,16 +63,21 @@ public class PipeRowCollector implements RowCollector { } } tablet.rowSize++; - } - public TabletInsertionEvent toTabletInsertionEvent() { - if (tablet == null) { - return new PipeEmptyTabletInsertionEvent(); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + collectTabletInsertionEvent(); } + } - PipeTabletTabletInsertionEvent tabletInsertionEvent = - new PipeTabletTabletInsertionEvent(tablet); + private void collectTabletInsertionEvent() { + if (tablet != null) { + tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet)); + } this.tablet = null; - return tabletInsertionEvent; + } + + public Iterable<TabletInsertionEvent> convertToTabletInsertionEvents() { + collectTabletInsertionEvent(); + return tabletInsertionEventList; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java index 16d21f73138..56a6bc49632 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; -import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow; import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector; import org.apache.iotdb.pipe.api.access.Row; @@ -40,6 +39,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.BiConsumer; @@ -52,8 +52,9 @@ public class TabletInsertionDataContainer { private String[] columnNameStringList; private long[] timestampColumn; - private Object[] valueColumns; private TSDataType[] valueColumnTypes; + // each column of Object[] is a column of primitive type array + private Object[] valueColumns; private BitMap[] nullValueColumnBitmaps; private int rowCount; @@ -89,7 +90,8 @@ public class TabletInsertionDataContainer { this.timestampColumn = new long[] {insertRowNode.getTime()}; generateColumnIndexMapper( - insertRowNode, pattern, originColumnIndex2FilteredColumnIndexMapperList); + insertRowNode.getMeasurements(), pattern, originColumnIndex2FilteredColumnIndexMapperList); + final int filteredColumnSize = Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) .filter(Objects::nonNull) @@ -98,22 +100,46 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; + this.valueColumns = new Object[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; final MeasurementSchema[] originMeasurementSchemaList = insertRowNode.getMeasurementSchemas(); final String[] originColumnNameStringList = insertRowNode.getMeasurements(); - final Object[] originValueColumns = insertRowNode.getValues(); final TSDataType[] originValueColumnTypes = insertRowNode.getDataTypes(); + final Object[] originValueColumns = insertRowNode.getValues(); for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; - this.valueColumns[filteredColumnIndex] = originValueColumns[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; + switch (originValueColumnTypes[i]) { + case INT32: + this.valueColumns[filteredColumnIndex] = new int[] {(Integer) originValueColumns[i]}; + break; + case INT64: + this.valueColumns[filteredColumnIndex] = new long[] {(Long) originValueColumns[i]}; + break; + case FLOAT: + this.valueColumns[filteredColumnIndex] = new float[] {(Float) originValueColumns[i]}; + break; + case DOUBLE: + this.valueColumns[filteredColumnIndex] = new double[] {(Double) originValueColumns[i]}; + break; + case BOOLEAN: + this.valueColumns[filteredColumnIndex] = + new boolean[] {(Boolean) originValueColumns[i]}; + break; + case TEXT: + this.valueColumns[filteredColumnIndex] = new Binary[] {(Binary) originValueColumns[i]}; + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Data type %s is not supported.", originValueColumnTypes[i].toString())); + } this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1); } } @@ -130,7 +156,9 @@ public class TabletInsertionDataContainer { this.timestampColumn = insertTabletNode.getTimes(); generateColumnIndexMapper( - insertTabletNode, pattern, originColumnIndex2FilteredColumnIndexMapperList); + insertTabletNode.getMeasurements(), + pattern, + originColumnIndex2FilteredColumnIndexMapperList); final int filteredColumnSize = Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) @@ -140,15 +168,15 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; + this.valueColumns = new Object[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; final MeasurementSchema[] originMeasurementSchemaList = insertTabletNode.getMeasurementSchemas(); final String[] originColumnNameStringList = insertTabletNode.getMeasurements(); - final Object[] originValueColumns = insertTabletNode.getColumns(); final TSDataType[] originValueColumnTypes = insertTabletNode.getDataTypes(); + final Object[] originValueColumns = insertTabletNode.getColumns(); final BitMap[] originBitMapList = (insertTabletNode.getBitMaps() == null ? IntStream.range(0, originColumnSize) @@ -156,15 +184,19 @@ public class TabletInsertionDataContainer { .map(o -> new BitMap(timestampColumn.length)) .toArray(BitMap[]::new) : insertTabletNode.getBitMaps()); + for (int i = 0; i < originBitMapList.length; i++) { + if (originBitMapList[i] == null) { + originBitMapList[i] = new BitMap(timestampColumn.length); + } + } for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; - this.valueColumns[filteredColumnIndex] = - convertToColumn(originValueColumns[i], originValueColumnTypes[i], originBitMapList[i]); this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; + this.valueColumns[filteredColumnIndex] = originValueColumns[i]; this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i]; } } @@ -179,7 +211,13 @@ public class TabletInsertionDataContainer { this.deviceId = tablet.deviceId; this.timestampColumn = tablet.timestamps; - generateColumnIndexMapper(tablet, pattern, originColumnIndex2FilteredColumnIndexMapperList); + final List<MeasurementSchema> originMeasurementSchemaList = tablet.getSchemas(); + final String[] originMeasurementList = new String[originMeasurementSchemaList.size()]; + for (int i = 0; i < originMeasurementSchemaList.size(); i++) { + originMeasurementList[i] = originMeasurementSchemaList.get(i).getMeasurementId(); + } + generateColumnIndexMapper( + originMeasurementList, pattern, originColumnIndex2FilteredColumnIndexMapperList); final int filteredColumnSize = Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) @@ -189,11 +227,10 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; + this.valueColumns = new Object[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; - final List<MeasurementSchema> originMeasurementSchemaList = tablet.getSchemas(); final String[] originColumnNameStringList = new String[originColumnSize]; final TSDataType[] originValueColumnTypes = new TSDataType[originColumnSize]; for (int i = 0; i < originColumnSize; i++) { @@ -201,16 +238,26 @@ public class TabletInsertionDataContainer { originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType(); } final Object[] originValueColumns = tablet.values; - final BitMap[] originBitMapList = tablet.bitMaps; + final BitMap[] originBitMapList = + tablet.bitMaps == null + ? IntStream.range(0, originColumnSize) + .boxed() + .map(o -> new BitMap(timestampColumn.length)) + .toArray(BitMap[]::new) + : tablet.bitMaps; + for (int i = 0; i < originBitMapList.length; i++) { + if (originBitMapList[i] == null) { + originBitMapList[i] = new BitMap(timestampColumn.length); + } + } for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList.get(i); this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; - this.valueColumns[filteredColumnIndex] = - convertToColumn(originValueColumns[i], originValueColumnTypes[i], originBitMapList[i]); this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; + this.valueColumns[filteredColumnIndex] = originValueColumns[i]; this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i]; } } @@ -218,6 +265,7 @@ public class TabletInsertionDataContainer { rowCount = tablet.rowSize; } + // TODO: cache the result keyed by deviceId to improve performance private void generateColumnIndexMapper( String[] originMeasurementList, String pattern, @@ -251,88 +299,14 @@ public class TabletInsertionDataContainer { } } - private void generateColumnIndexMapper( - InsertNode insertNode, - String pattern, - Integer[] originColumnIndex2FilteredColumnIndexMapperList) { - generateColumnIndexMapper( - insertNode.getMeasurements(), pattern, originColumnIndex2FilteredColumnIndexMapperList); - } - - private void generateColumnIndexMapper( - Tablet tablet, String pattern, Integer[] originColumnIndex2FilteredColumnIndexMapperList) { - final List<MeasurementSchema> originMeasurementSchemaList = tablet.getSchemas(); - final String[] originMeasurementList = new String[originMeasurementSchemaList.size()]; - for (int i = 0; i < originMeasurementSchemaList.size(); i++) { - originMeasurementList[i] = originMeasurementSchemaList.get(i).getMeasurementId(); - } - generateColumnIndexMapper( - originMeasurementList, pattern, originColumnIndex2FilteredColumnIndexMapperList); - } - - private Object convertToColumn(Object originColumn, TSDataType dataType, BitMap bitMap) { - switch (dataType) { - case INT32: - final int[] intValues = (int[]) originColumn; - final int[] integerValues = new int[intValues.length]; - for (int i = 0; i < intValues.length; i++) { - integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 : intValues[i]; - } - return integerValues; - case INT64: - final long[] longValues = (long[]) originColumn; - final long[] longValues2 = new long[longValues.length]; - for (int i = 0; i < longValues.length; i++) { - longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : longValues[i]; - } - return longValues2; - case FLOAT: - final float[] floatValues = (float[]) originColumn; - final float[] floatValues2 = new float[floatValues.length]; - for (int i = 0; i < floatValues.length; i++) { - floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : floatValues[i]; - } - return floatValues2; - case DOUBLE: - final double[] doubleValues = (double[]) originColumn; - final double[] doubleValues2 = new double[doubleValues.length]; - for (int i = 0; i < doubleValues.length; i++) { - doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : doubleValues[i]; - } - return doubleValues2; - case BOOLEAN: - final boolean[] booleanValues = (boolean[]) originColumn; - final boolean[] booleanValues2 = new boolean[booleanValues.length]; - for (int i = 0; i < booleanValues.length; i++) { - booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) && booleanValues[i]; - } - return booleanValues2; - case TEXT: - final Binary[] binaryValues = (Binary[]) originColumn; - final Binary[] stringValues = new Binary[binaryValues.length]; - for (int i = 0; i < binaryValues.length; i++) { - stringValues[i] = - bitMap != null && bitMap.isMarked(i) - ? null - : (binaryValues[i] == null - ? null - : Binary.valueOf(binaryValues[i].getStringValue())); - } - return stringValues; - default: - throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", dataType)); - } - } - //////////////////////////// process //////////////////////////// - public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { - final PipeRowCollector rowCollector = new PipeRowCollector(); - if (valueColumns.length == 0) { - return new PipeEmptyTabletInsertionEvent(); + public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) { + if (valueColumns.length == 0 || timestampColumn.length == 0) { + return Collections.emptyList(); } + final PipeRowCollector rowCollector = new PipeRowCollector(); for (int i = 0; i < timestampColumn.length; i++) { consumer.accept( new PipeRow( @@ -340,18 +314,19 @@ public class TabletInsertionDataContainer { deviceId, measurementSchemaList, timestampColumn, - valueColumns, valueColumnTypes, + valueColumns, + nullValueColumnBitmaps, columnNameStringList), rowCollector); } - return rowCollector.toTabletInsertionEvent(); + return rowCollector.convertToTabletInsertionEvents(); } - public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) { final PipeRowCollector rowCollector = new PipeRowCollector(); consumer.accept(convertToTablet(), rowCollector); - return rowCollector.toTabletInsertionEvent(); + return rowCollector.convertToTabletInsertionEvents(); } //////////////////////////// convert //////////////////////////// @@ -362,10 +337,8 @@ public class TabletInsertionDataContainer { } final int columnSize = measurementSchemaList.length; - final List<MeasurementSchema> measurementSchemaArrayList = new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0, columnSize)); - final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, rowCount); newTablet.timestamps = timestampColumn; newTablet.bitMaps = nullValueColumnBitmaps; @@ -373,6 +346,7 @@ public class TabletInsertionDataContainer { newTablet.rowSize = rowCount; tablet = newTablet; + return tablet; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java index 9035a8fe0bb..fe804b31412 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.core.event.view.datastructure; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; @@ -124,7 +124,7 @@ public class TsFileInsertionDataContainer { @Override public TabletInsertionEvent next() { - return new PipeTabletTabletInsertionEvent(tabletIterator.next()); + return new PipeRawTabletInsertionEvent(tabletIterator.next()); } }; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java index 62979b7d52c..c9774b4618a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java @@ -56,15 +56,23 @@ public class PipeDoNothingProcessor implements PipeProcessor { .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { eventCollector.collect(tabletInsertionEvent); } else { - eventCollector.collect( - tabletInsertionEvent.processRowByRow( + tabletInsertionEvent + .processRowByRow( (row, rowCollector) -> { try { rowCollector.collectRow(row); } catch (IOException e) { throw new PipeException("Failed to collect row", e); } - })); + }) + .forEach( + event -> { + try { + eventCollector.collect(event); + } catch (IOException e) { + throw new PipeException("Failed to collect event", e); + } + }); } } else { eventCollector.collect(tabletInsertionEvent);
