This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch rel/1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 53a1329203e8a7a81a42441d4437fb3bef6d451b Author: Itami Sho <[email protected]> AuthorDate: Sun Jun 4 16:53:37 2023 +0800 [IOTDB-5967] Pipe: fix convertToTablet bug and introduce PipeEmptyTabletInsertionEvent (#10044) Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit 387c2102600273842a1e8578c79455366581b540) --- .../iotdb/pipe/api/collector/RowCollector.java | 5 +- .../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 38 +++-- .../event/impl/PipeEmptyTabletInsertionEvent.java | 28 ++-- ...ava => PipeInsertNodeTabletInsertionEvent.java} | 31 +++- ...nt.java => PipeTabletTabletInsertionEvent.java} | 6 +- .../realtime/PipeRealtimeCollectEventFactory.java | 6 +- .../core/event/realtime/TsFileEpochManager.java | 6 +- .../db/pipe/core/event/view/access/PipeRow.java | 22 +-- .../event/view/collector/PipeRowCollector.java | 10 +- .../TabletInsertionDataContainer.java | 106 ++++-------- .../TsFileInsertionDataContainer.java | 4 +- .../PipeInsertNodeTabletInsertionEventTest.java | 178 +++++++++++++++++++++ 12 files changed, 296 insertions(+), 144 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java index bdd96be926b..0518f8b2040 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java @@ -26,9 +26,8 @@ import java.io.IOException; import java.util.function.BiConsumer; /** - * Used to collect rows generated by {@link TabletInsertionEvent#processRowByRow(BiConsumer)},{@link - * TabletInsertionEvent#processByIterator(BiConsumer)} or {@link - * TabletInsertionEvent#processTablet(BiConsumer)}. + * Used to collect rows generated by {@link TabletInsertionEvent#processRowByRow(BiConsumer)}, + * {@link TabletInsertionEvent#processTablet(BiConsumer)}. */ public interface RowCollector { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java index 40c156e3285..c67e3966cc2 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java @@ -33,8 +33,9 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq; import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq; import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq; -import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent; import org.apache.iotdb.db.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; @@ -117,13 +118,15 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent try { - if (tabletInsertionEvent instanceof PipeInsertNodeInsertionEvent) { - doTransfer((PipeInsertNodeInsertionEvent) tabletInsertionEvent); - } else if (tabletInsertionEvent instanceof PipeTabletInsertionEvent) { - doTransfer((PipeTabletInsertionEvent) tabletInsertionEvent); + if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { + doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent); + } else if (tabletInsertionEvent instanceof PipeTabletTabletInsertionEvent) { + doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent); + } else if (tabletInsertionEvent instanceof PipeEmptyTabletInsertionEvent) { + doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent); } else { throw new NotImplementedException( - "IoTDBThriftConnectorV1 only support PipeInsertNodeInsertionEvent and PipeTabletInsertionEvent."); + "IoTDBThriftConnectorV1 only support PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent."); } } catch (TException e) { LOGGER.error( @@ -136,35 +139,40 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { } } - private void doTransfer(PipeInsertNodeInsertionEvent pipeInsertNodeInsertionEvent) + private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, TException, WALPipeException { final TPipeTransferResp resp = client.pipeTransfer( PipeTransferInsertNodeReq.toTPipeTransferReq( - pipeInsertNodeInsertionEvent.getInsertNode())); + pipeInsertNodeTabletInsertionEvent.getInsertNode())); if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new PipeException( String.format( - "Transfer PipeInsertNodeInsertionEvent %s error, result status %s", - pipeInsertNodeInsertionEvent, resp.status)); + "Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s", + pipeInsertNodeTabletInsertionEvent, resp.status)); } } - private void doTransfer(PipeTabletInsertionEvent pipeTabletInsertionEvent) + private void doTransfer(PipeTabletTabletInsertionEvent pipeTabletTabletInsertionEvent) throws PipeException, TException, IOException { final TPipeTransferResp resp = client.pipeTransfer( - PipeTransferTabletReq.toTPipeTransferReq(pipeTabletInsertionEvent.convertToTablet())); + PipeTransferTabletReq.toTPipeTransferReq( + pipeTabletTabletInsertionEvent.convertToTablet())); if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new PipeException( String.format( - "Transfer PipeTabletInsertionEvent %s error, result status %s", - pipeTabletInsertionEvent, resp.status)); + "Transfer PipeTabletTabletInsertionEvent %s error, result status %s", + pipeTabletTabletInsertionEvent, resp.status)); } } + private void doTransfer(PipeEmptyTabletInsertionEvent pipeEmptyTabletInsertionEvent) { + // do nothing + } + @Override public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java similarity index 65% copy from iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java copy to server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java index bdd96be926b..855da8fa8b0 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/collector/RowCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java @@ -17,27 +17,23 @@ * under the License. */ -package org.apache.iotdb.pipe.api.collector; +package org.apache.iotdb.db.pipe.core.event.impl; import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.tsfile.write.record.Tablet; -import java.io.IOException; import java.util.function.BiConsumer; -/** - * Used to collect rows generated by {@link TabletInsertionEvent#processRowByRow(BiConsumer)},{@link - * TabletInsertionEvent#processByIterator(BiConsumer)} or {@link - * TabletInsertionEvent#processTablet(BiConsumer)}. - */ -public interface RowCollector { +public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent { + @Override + public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { + return this; + } - /** - * Collects a row. - * - * @param row Row to be collected - * @throws IOException if any I/O errors occur - * @see Row - */ - void collectRow(Row row) throws IOException; + @Override + public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { + return this; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java similarity index 83% rename from server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java rename to server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java index 3b980f10b9d..416fea8b734 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.impl; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.pipe.core.event.EnrichedEvent; import org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer; @@ -38,21 +39,23 @@ import org.slf4j.LoggerFactory; import java.util.function.BiConsumer; -public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements TabletInsertionEvent { +public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent + implements TabletInsertionEvent { - private static final Logger LOGGER = LoggerFactory.getLogger(PipeInsertNodeInsertionEvent.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class); private final WALEntryHandler walEntryHandler; private final ProgressIndex progressIndex; private TabletInsertionDataContainer dataContainer; - public PipeInsertNodeInsertionEvent( + public PipeInsertNodeTabletInsertionEvent( WALEntryHandler walEntryHandler, ProgressIndex progressIndex) { this(walEntryHandler, progressIndex, null, null); } - private PipeInsertNodeInsertionEvent( + private PipeInsertNodeTabletInsertionEvent( WALEntryHandler walEntryHandler, ProgressIndex progressIndex, PipeTaskMeta pipeTaskMeta, @@ -104,9 +107,10 @@ public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements Table } @Override - public PipeInsertNodeInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( + public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( PipeTaskMeta pipeTaskMeta, String pattern) { - return new PipeInsertNodeInsertionEvent(walEntryHandler, progressIndex, pipeTaskMeta, pattern); + return new PipeInsertNodeTabletInsertionEvent( + walEntryHandler, progressIndex, pipeTaskMeta, pattern); } /////////////////////////// TabletInsertionEvent /////////////////////////// @@ -149,11 +153,24 @@ public class PipeInsertNodeInsertionEvent extends EnrichedEvent implements Table } } + @TestOnly + public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) { + try { + if (dataContainer == null) { + dataContainer = new TabletInsertionDataContainer(insertNode, pattern); + } + return dataContainer.convertToTablet(); + } catch (Exception e) { + LOGGER.error("Process tablet error.", e); + throw new PipeException("Process tablet error.", e); + } + } + /////////////////////////// Object /////////////////////////// @Override public String toString() { - return "PipeTabletInsertionEvent{" + return "PipeTabletTabletInsertionEvent{" + "walEntryHandler=" + walEntryHandler + ", progressIndex=" diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java similarity index 92% rename from server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java rename to server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java index 399b091264d..014972abd7f 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java @@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet; import java.util.Objects; import java.util.function.BiConsumer; -public class PipeTabletInsertionEvent implements TabletInsertionEvent { +public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { private final Tablet tablet; private final String pattern; private TabletInsertionDataContainer dataContainer; - public PipeTabletInsertionEvent(Tablet tablet) { + public PipeTabletTabletInsertionEvent(Tablet tablet) { this(Objects.requireNonNull(tablet), null); } - public PipeTabletInsertionEvent(Tablet tablet, String pattern) { + public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) { this.tablet = Objects.requireNonNull(tablet); this.pattern = pattern; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java index 61d65cd43e8..b2a622f7c05 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEventFactory.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; -import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent; import org.apache.iotdb.db.wal.utils.WALEntryHandler; @@ -36,8 +36,8 @@ public class PipeRealtimeCollectEventFactory { public static PipeRealtimeCollectEvent createCollectEvent( WALEntryHandler walEntryHandler, InsertNode insertNode, TsFileResource resource) { - return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeInsertionEvent( - new PipeInsertNodeInsertionEvent(walEntryHandler, insertNode.getProgressIndex()), + return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( + new PipeInsertNodeTabletInsertionEvent(walEntryHandler, insertNode.getProgressIndex()), insertNode, resource); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java index 3e09b86e74f..5a33649aad2 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/TsFileEpochManager.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.core.event.realtime; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; -import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent; import org.slf4j.Logger; @@ -59,8 +59,8 @@ public class TsFileEpochManager { event.getPattern()); } - public PipeRealtimeCollectEvent bindPipeInsertNodeInsertionEvent( - PipeInsertNodeInsertionEvent event, InsertNode node, TsFileResource resource) { + public PipeRealtimeCollectEvent bindPipeInsertNodeTabletInsertionEvent( + PipeInsertNodeTabletInsertionEvent event, InsertNode node, TsFileResource resource) { return new PipeRealtimeCollectEvent( event, filePath2Epoch.computeIfAbsent(resource.getTsFilePath(), TsFileEpoch::new), diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java index 043be5ec83f..dfab1e1aa2b 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java @@ -39,7 +39,7 @@ public class PipeRow implements Row { private final MeasurementSchema[] measurementSchemaList; private final long[] timestampColumn; - private final Object[][] valueColumns; + private final Object[] valueColumns; private final TSDataType[] valueColumnTypes; private final String[] columnNameStringList; @@ -49,7 +49,7 @@ public class PipeRow implements Row { String deviceId, MeasurementSchema[] measurementSchemaList, long[] timestampColumn, - Object[][] valueColumns, + Object[] valueColumns, TSDataType[] valueColumnTypes, String[] columnNameStringList) { this.rowIndex = rowIndex; @@ -68,42 +68,42 @@ public class PipeRow implements Row { @Override public int getInt(int columnIndex) { - return (int) valueColumns[columnIndex][rowIndex]; + return ((int[]) valueColumns[columnIndex])[rowIndex]; } @Override public long getLong(int columnIndex) { - return (long) valueColumns[columnIndex][rowIndex]; + return ((long[]) valueColumns[columnIndex])[rowIndex]; } @Override public float getFloat(int columnIndex) { - return (float) valueColumns[columnIndex][rowIndex]; + return ((float[]) valueColumns[columnIndex])[rowIndex]; } @Override public double getDouble(int columnIndex) { - return (double) valueColumns[columnIndex][rowIndex]; + return ((double[]) valueColumns[columnIndex])[rowIndex]; } @Override public boolean getBoolean(int columnIndex) { - return (boolean) valueColumns[columnIndex][rowIndex]; + return ((boolean[]) valueColumns[columnIndex])[rowIndex]; } @Override public Binary getBinary(int columnIndex) { - return Binary.valueOf((String) valueColumns[columnIndex][rowIndex]); + return ((Binary[]) valueColumns[columnIndex])[rowIndex]; } @Override public String getString(int columnIndex) { - return (String) valueColumns[columnIndex][rowIndex]; + return ((Binary[]) valueColumns[columnIndex])[rowIndex].getStringValue(); } @Override public Object getObject(int columnIndex) { - return valueColumns[columnIndex][rowIndex]; + return ((Object[]) valueColumns[columnIndex])[rowIndex]; } @Override @@ -113,7 +113,7 @@ public class PipeRow implements Row { @Override public boolean isNull(int columnIndex) { - return valueColumns[columnIndex][rowIndex] == null; + return ((Object[]) valueColumns[columnIndex])[rowIndex] == null; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java index 3d39e08c32a..ab0371252f3 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java @@ -19,7 +19,8 @@ package org.apache.iotdb.db.pipe.core.event.view.collector; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; @@ -65,7 +66,12 @@ public class PipeRowCollector implements RowCollector { } public TabletInsertionEvent toTabletInsertionEvent() { - PipeTabletInsertionEvent tabletInsertionEvent = new PipeTabletInsertionEvent(tablet); + if (tablet == null) { + return new PipeEmptyTabletInsertionEvent(); + } + + PipeTabletTabletInsertionEvent tabletInsertionEvent = + new PipeTabletTabletInsertionEvent(tablet); this.tablet = null; return tabletInsertionEvent; } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java index f61d4f623ad..16d21f73138 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow; import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector; import org.apache.iotdb.pipe.api.access.Row; @@ -51,7 +52,7 @@ public class TabletInsertionDataContainer { private String[] columnNameStringList; private long[] timestampColumn; - private Object[][] valueColumns; + private Object[] valueColumns; private TSDataType[] valueColumnTypes; private BitMap[] nullValueColumnBitmaps; private int rowCount; @@ -97,7 +98,7 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize][1]; + this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; @@ -111,7 +112,7 @@ public class TabletInsertionDataContainer { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; - this.valueColumns[filteredColumnIndex][0] = originValueColumns[i]; + this.valueColumns[filteredColumnIndex] = originValueColumns[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1); } @@ -139,7 +140,7 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize][]; + this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; @@ -188,7 +189,7 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize][]; + this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; @@ -269,51 +270,53 @@ public class TabletInsertionDataContainer { originMeasurementList, pattern, originColumnIndex2FilteredColumnIndexMapperList); } - private Object[] convertToColumn(Object originColumn, TSDataType dataType, BitMap bitMap) { + private Object convertToColumn(Object originColumn, TSDataType dataType, BitMap bitMap) { switch (dataType) { case INT32: final int[] intValues = (int[]) originColumn; - final Integer[] integerValues = new Integer[intValues.length]; + final int[] integerValues = new int[intValues.length]; for (int i = 0; i < intValues.length; i++) { - integerValues[i] = bitMap != null && bitMap.isMarked(i) ? null : intValues[i]; + integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 : intValues[i]; } return integerValues; case INT64: final long[] longValues = (long[]) originColumn; - final Long[] longValues2 = new Long[longValues.length]; + final long[] longValues2 = new long[longValues.length]; for (int i = 0; i < longValues.length; i++) { - longValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : longValues[i]; + longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : longValues[i]; } return longValues2; case FLOAT: final float[] floatValues = (float[]) originColumn; - final Float[] floatValues2 = new Float[floatValues.length]; + final float[] floatValues2 = new float[floatValues.length]; for (int i = 0; i < floatValues.length; i++) { - floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : floatValues[i]; + floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : floatValues[i]; } return floatValues2; case DOUBLE: final double[] doubleValues = (double[]) originColumn; - final Double[] doubleValues2 = new Double[doubleValues.length]; + final double[] doubleValues2 = new double[doubleValues.length]; for (int i = 0; i < doubleValues.length; i++) { - doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : doubleValues[i]; + doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : doubleValues[i]; } return doubleValues2; case BOOLEAN: final boolean[] booleanValues = (boolean[]) originColumn; - final Boolean[] booleanValues2 = new Boolean[booleanValues.length]; + final boolean[] booleanValues2 = new boolean[booleanValues.length]; for (int i = 0; i < booleanValues.length; i++) { - booleanValues2[i] = bitMap != null && bitMap.isMarked(i) ? null : booleanValues[i]; + booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) && booleanValues[i]; } return booleanValues2; case TEXT: final Binary[] binaryValues = (Binary[]) originColumn; - final String[] stringValues = new String[binaryValues.length]; + final Binary[] stringValues = new Binary[binaryValues.length]; for (int i = 0; i < binaryValues.length; i++) { stringValues[i] = bitMap != null && bitMap.isMarked(i) ? null - : (binaryValues[i] == null ? null : binaryValues[i].getStringValue()); + : (binaryValues[i] == null + ? null + : Binary.valueOf(binaryValues[i].getStringValue())); } return stringValues; default: @@ -326,6 +329,10 @@ public class TabletInsertionDataContainer { public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { final PipeRowCollector rowCollector = new PipeRowCollector(); + if (valueColumns.length == 0) { + return new PipeEmptyTabletInsertionEvent(); + } + for (int i = 0; i < timestampColumn.length; i++) { consumer.accept( new PipeRow( @@ -355,76 +362,17 @@ public class TabletInsertionDataContainer { } final int columnSize = measurementSchemaList.length; - final int rowSize = valueColumns[0].length; final List<MeasurementSchema> measurementSchemaArrayList = new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0, columnSize)); - final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, rowSize); + final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, rowCount); newTablet.timestamps = timestampColumn; newTablet.bitMaps = nullValueColumnBitmaps; - newTablet.values = squashFromColumnList(valueColumns, valueColumnTypes); + newTablet.values = valueColumns; newTablet.rowSize = rowCount; tablet = newTablet; return tablet; } - - private Object[] squashFromColumnList(Object[][] valueColumns, TSDataType[] valueColumnTypes) { - final Object[] values = new Object[valueColumns.length]; - for (int i = 0; i < valueColumns.length; i++) { - values[i] = squashFromColumn(valueColumns[i], valueColumnTypes[i]); - } - return values; - } - - private Object squashFromColumn(Object[] valueColumn, TSDataType valueColumnType) { - switch (valueColumnType) { - case INT32: - final Integer[] intValues = (Integer[]) valueColumn; - final int[] intValues2 = new int[intValues.length]; - for (int i = 0; i < intValues.length; i++) { - intValues2[i] = intValues[i] == null ? 0 : intValues[i]; - } - return intValues2; - case INT64: - final Long[] longValues = (Long[]) valueColumn; - final long[] longValues2 = new long[longValues.length]; - for (int i = 0; i < longValues.length; i++) { - longValues2[i] = longValues[i] == null ? 0 : longValues[i]; - } - return longValues2; - case FLOAT: - final Float[] floatValues = (Float[]) valueColumn; - final float[] floatValues2 = new float[floatValues.length]; - for (int i = 0; i < floatValues.length; i++) { - floatValues2[i] = floatValues[i] == null ? 0 : floatValues[i]; - } - return floatValues2; - case DOUBLE: - final Double[] doubleValues = (Double[]) valueColumn; - final double[] doubleValues2 = new double[doubleValues.length]; - for (int i = 0; i < doubleValues.length; i++) { - doubleValues2[i] = doubleValues[i] == null ? 0 : doubleValues[i]; - } - return doubleValues2; - case BOOLEAN: - final Boolean[] booleanValues = (Boolean[]) valueColumn; - final boolean[] booleanValues2 = new boolean[booleanValues.length]; - for (int i = 0; i < booleanValues.length; i++) { - booleanValues2[i] = booleanValues[i] != null && booleanValues[i]; - } - return booleanValues2; - case TEXT: - final String[] stringValues = (String[]) valueColumn; - final Binary[] binaryValues = new Binary[stringValues.length]; - for (int i = 0; i < stringValues.length; i++) { - binaryValues[i] = stringValues[i] == null ? null : Binary.valueOf(stringValues[i]); - } - return binaryValues; - default: - throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", valueColumnType)); - } - } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java index 260814d609f..9035a8fe0bb 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.pipe.core.event.view.datastructure; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; @@ -124,7 +124,7 @@ public class TsFileInsertionDataContainer { @Override public TabletInsertionEvent next() { - return new PipeTabletInsertionEvent(tabletIterator.next()); + return new PipeTabletTabletInsertionEvent(tabletIterator.next()); } }; } diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java new file mode 100644 index 00000000000..5a211fc2cff --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.core.event; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.BitMap; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; + +public class PipeInsertNodeTabletInsertionEventTest { + + InsertRowNode insertRowNode; + InsertTabletNode insertTabletNode; + + final String deviceId = "root.sg.d1"; + final long[] times = new long[] {110L, 111L, 112L, 113L, 114L}; + final String[] measurementIds = new String[] {"s1", "s2", "s3", "s4", "s5", "s6"}; + final TSDataType[] dataTypes = + new TSDataType[] { + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.BOOLEAN, + TSDataType.TEXT + }; + + final MeasurementSchema[] schemas = new MeasurementSchema[6]; + final Object[] values = new Object[6]; + + final String pattern = "root.sg.d1"; + + Tablet tabletForInsertRowNode; + Tablet tabletForInsertTabletNode; + + @Before + public void setUp() throws Exception { + createMeasurementSchema(); + createInsertRowNode(); + createInsertTabletNode(); + createTablet(); + } + + private void createMeasurementSchema() { + for (int i = 0; i < 6; i++) { + schemas[i] = new MeasurementSchema(measurementIds[i], dataTypes[i]); + } + } + + private void createInsertRowNode() throws IllegalPathException { + + insertRowNode = + new InsertRowNode( + new PlanNodeId("plan node 1"), + new PartialPath(deviceId), + false, + measurementIds, + dataTypes, + schemas, + times[0], + values, + false); + } + + private void createInsertTabletNode() throws IllegalPathException { + this.insertTabletNode = + new InsertTabletNode( + new PlanNodeId("plannode 1"), + new PartialPath(deviceId), + false, + measurementIds, + dataTypes, + schemas, + times, + null, + values, + times.length); + } + + private void createTablet() { + + // create tablet for insertRowNode + BitMap[] bitMapsForInsertRowNode = new BitMap[6]; + for (int i = 0; i < 6; i++) { + bitMapsForInsertRowNode[i] = new BitMap(1); + } + + values[0] = new int[1]; + values[1] = new long[1]; + values[2] = new float[1]; + values[3] = new double[1]; + values[4] = new boolean[1]; + values[5] = new Binary[1]; + + for (int r = 0; r < 1; r++) { + ((int[]) values[0])[r] = 100; + ((long[]) values[1])[r] = 10000; + ((float[]) values[2])[r] = 2; + ((double[]) values[3])[r] = 1.0; + ((boolean[]) values[4])[r] = false; + ((Binary[]) values[5])[r] = Binary.valueOf("text"); + } + + tabletForInsertRowNode = new Tablet(deviceId, Arrays.asList(schemas), 1); + tabletForInsertRowNode.values = values; + tabletForInsertRowNode.timestamps = new long[] {times[0]}; + tabletForInsertRowNode.rowSize = 1; + tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode; + + // create tablet for insertTabletNode + BitMap[] bitMapsForInsertTabletNode = new BitMap[6]; + for (int i = 0; i < 6; i++) { + bitMapsForInsertTabletNode[i] = new BitMap(times.length); + } + + values[0] = new int[5]; + values[1] = new long[5]; + values[2] = new float[5]; + values[3] = new double[5]; + values[4] = new boolean[5]; + values[5] = new Binary[5]; + + for (int r = 0; r < 5; r++) { + ((int[]) values[0])[r] = 100; + ((long[]) values[1])[r] = 10000; + ((float[]) values[2])[r] = 2; + ((double[]) values[3])[r] = 1.0; + ((boolean[]) values[4])[r] = false; + ((Binary[]) values[5])[r] = Binary.valueOf("text"); + } + tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas), times.length); + tabletForInsertTabletNode.values = values; + tabletForInsertTabletNode.timestamps = times; + tabletForInsertTabletNode.rowSize = times.length; + tabletForInsertTabletNode.bitMaps = bitMapsForInsertTabletNode; + } + + @Test + public void convertToTabletForTest() { + PipeInsertNodeTabletInsertionEvent event1 = new PipeInsertNodeTabletInsertionEvent(null, null); + Tablet tablet1 = event1.convertToTabletForTest(insertRowNode, pattern); + Assert.assertEquals(tablet1, tabletForInsertRowNode); + + PipeInsertNodeTabletInsertionEvent event2 = new PipeInsertNodeTabletInsertionEvent(null, null); + Tablet tablet2 = event2.convertToTabletForTest(insertTabletNode, pattern); + Assert.assertEquals(tablet2, tabletForInsertTabletNode); + } +}
