This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5973-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f415a1d9e7b0b28d02201850b697da16f3f25b82 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jun 7 02:04:24 2023 +0800 [IOTDB-5973] Pipe: fix ClassCastException when using pipe.core.event.view & support collector.pattern in historical collector (#10058) * remove pipe_connector_session_id in config * fix ClassCastException when using pipe.core.event.view * support collector.pattern in historical collector (cherry picked from commit 397f36edafdab5146f3ea9a7cd943fc3c4262e89) --- .../event/dml/insertion/TabletInsertionEvent.java | 12 +- .../apache/iotdb/commons/conf/CommonConfig.java | 9 - .../iotdb/commons/conf/CommonDescriptor.java | 4 - .../iotdb/commons/pipe/config/PipeConfig.java | 4 - .../PipeHistoricalDataRegionTsFileCollector.java | 19 +- .../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 23 +- .../event/impl/PipeEmptyTabletInsertionEvent.java | 39 -- .../impl/PipeInsertNodeTabletInsertionEvent.java | 32 +- ...Event.java => PipeRawTabletInsertionEvent.java} | 20 +- .../core/event/impl/PipeTsFileInsertionEvent.java | 7 +- .../db/pipe/core/event/view/access/PipeRow.java | 32 +- .../event/view/collector/PipeRowCollector.java | 23 +- .../TabletInsertionDataContainer.java | 179 +++--- .../TsFileInsertionDataContainer.java | 145 +++-- .../TsFileInsertionDataTabletIterator.java | 286 +++------- .../core/processor/PipeDoNothingProcessor.java | 14 +- ...Test.java => PipeTabletInsertionEventTest.java} | 47 +- .../event/TsFileInsertionDataContainerTest.java | 608 +++++++++++++++++++++ .../iotdb/tsfile/read/TsFileSequenceReader.java | 18 + 19 files changed, 994 insertions(+), 527 deletions(-) diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java index 4a8073a5a26..09b129a9cc4 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/dml/insertion/TabletInsertionEvent.java @@ -32,16 +32,16 @@ public interface TabletInsertionEvent extends Event { /** * The consumer processes the data row by row and collects the results by RowCollector. * - * @return TabletInsertionEvent a new TabletInsertionEvent contains the results collected by the - * RowCollector + * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent contains the results + * collected by the RowCollector */ - TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer); + Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer); /** * The consumer processes the Tablet directly and collects the results by RowCollector. * - * @return TabletInsertionEvent a new TabletInsertionEvent contains the results collected by the - * RowCollector + * @return Iterable<TabletInsertionEvent> a list of new TabletInsertionEvent contains the results + * collected by the RowCollector */ - TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer); + Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer); } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 4f70504681e..b01128cefcc 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -156,7 +156,6 @@ public class CommonConfig { private int pipeConnectorReadFileBufferSize = 8388608; private long pipeConnectorRetryIntervalMs = 1000L; private int pipeConnectorPendingQueueSize = 1024; - private long pipeConnectorSessionId = Long.MAX_VALUE / 2; private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100; private long pipeMetaSyncerInitialSyncDelayMinutes = 3; @@ -551,12 +550,4 @@ public class CommonConfig { this.pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs; } - - public long getPipeConnectorSessionId() { - return pipeConnectorSessionId; - } - - public void setPipeConnectorSessionId(long pipeSessionId) { - this.pipeConnectorSessionId = pipeSessionId; - } } diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index e45c4af45c5..bd13ad3c53b 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -285,10 +285,6 @@ public class CommonDescriptor { properties.getProperty( "pipe_connector_pending_queue_size", String.valueOf(config.getPipeConnectorPendingQueueSize())))); - config.setPipeConnectorSessionId( - Long.parseLong( - properties.getProperty( - "pipe_connector_session_id", String.valueOf(config.getPipeConnectorSessionId())))); config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta( Integer.parseInt( diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index af0fcd379ff..365ea0b8003 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -85,10 +85,6 @@ public class PipeConfig { return COMMON_CONFIG.getPipeConnectorPendingQueueSize(); } - public long getPipeConnectorSessionId() { - return COMMON_CONFIG.getPipeConnectorSessionId(); - } - /////////////////////////////// Meta Consistency /////////////////////////////// public int getHeartbeatLoopCyclesForCollectingPipeMeta() { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java index a1742f9fce3..583cbd5d853 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java @@ -56,6 +56,7 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR private final PipeTaskMeta pipeTaskMeta; private final ProgressIndex startIndex; + private String pattern; private int dataRegionId; private final long historicalDataCollectionTimeLowerBound; @@ -80,6 +81,10 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR @Override public void customize( PipeParameters parameters, PipeCollectorRuntimeConfiguration configuration) { + pattern = + parameters.getStringOrDefault( + PipeCollectorConstant.COLLECTOR_PATTERN_KEY, + PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE); dataRegionId = parameters.getInt(DATA_REGION_KEY); historicalDataCollectionStartTime = parameters.hasAttribute(COLLECTOR_HISTORY_START_TIME) @@ -153,12 +158,7 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterCollectionTimeLowerBound(resource)) - .map( - resource -> - new PipeTsFileInsertionEvent( - resource, - pipeTaskMeta, - PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) + .map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern)) .collect(Collectors.toList())); pendingQueue.addAll( tsFileManager.getTsFileList(false).stream() @@ -167,12 +167,7 @@ public class PipeHistoricalDataRegionTsFileCollector extends PipeHistoricalDataR !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterCollectionTimeLowerBound(resource)) - .map( - resource -> - new PipeTsFileInsertionEvent( - resource, - pipeTaskMeta, - PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) + .map(resource -> new PipeTsFileInsertionEvent(resource, pipeTaskMeta, pattern)) .collect(Collectors.toList())); pendingQueue.forEach( event -> diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java index c67e3966cc2..4c0b1c0e54e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java @@ -33,9 +33,8 @@ import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransfe import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferHandshakeReq; import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferInsertNodeReq; import org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferTabletReq; -import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.impl.PipeTsFileInsertionEvent; import org.apache.iotdb.db.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; @@ -120,13 +119,11 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { try { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent); - } else if (tabletInsertionEvent instanceof PipeTabletTabletInsertionEvent) { - doTransfer((PipeTabletTabletInsertionEvent) tabletInsertionEvent); - } else if (tabletInsertionEvent instanceof PipeEmptyTabletInsertionEvent) { - doTransfer((PipeEmptyTabletInsertionEvent) tabletInsertionEvent); + } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) { + doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent); } else { throw new NotImplementedException( - "IoTDBThriftConnectorV1 only support PipeInsertNodeTabletInsertionEvent and PipeTabletTabletInsertionEvent."); + "IoTDBThriftConnectorV1 only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent."); } } catch (TException e) { LOGGER.error( @@ -154,25 +151,21 @@ public class IoTDBThriftConnectorV1 implements PipeConnector { } } - private void doTransfer(PipeTabletTabletInsertionEvent pipeTabletTabletInsertionEvent) + private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, TException, IOException { final TPipeTransferResp resp = client.pipeTransfer( PipeTransferTabletReq.toTPipeTransferReq( - pipeTabletTabletInsertionEvent.convertToTablet())); + pipeRawTabletInsertionEvent.convertToTablet())); if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new PipeException( String.format( - "Transfer PipeTabletTabletInsertionEvent %s error, result status %s", - pipeTabletTabletInsertionEvent, resp.status)); + "Transfer PipeRawTabletInsertionEvent %s error, result status %s", + pipeRawTabletInsertionEvent, resp.status)); } } - private void doTransfer(PipeEmptyTabletInsertionEvent pipeEmptyTabletInsertionEvent) { - // do nothing - } - @Override public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception { // PipeProcessor can change the type of TabletInsertionEvent diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java deleted file mode 100644 index 855da8fa8b0..00000000000 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeEmptyTabletInsertionEvent.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.core.event.impl; - -import org.apache.iotdb.pipe.api.access.Row; -import org.apache.iotdb.pipe.api.collector.RowCollector; -import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.tsfile.write.record.Tablet; - -import java.util.function.BiConsumer; - -public class PipeEmptyTabletInsertionEvent implements TabletInsertionEvent { - @Override - public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { - return this; - } - - @Override - public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { - return this; - } -} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java index 416fea8b734..dd65ad4c319 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeInsertNodeTabletInsertionEvent.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.core.event.impl; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.pipe.core.event.EnrichedEvent; import org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer; @@ -116,7 +115,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent /////////////////////////// TabletInsertionEvent /////////////////////////// @Override - public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) { try { if (dataContainer == null) { dataContainer = new TabletInsertionDataContainer(getInsertNode(), getPattern()); @@ -129,7 +128,7 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } @Override - public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) { try { if (dataContainer == null) { dataContainer = new TabletInsertionDataContainer(getInsertNode(), getPattern()); @@ -141,36 +140,11 @@ public class PipeInsertNodeTabletInsertionEvent extends EnrichedEvent } } - public Tablet convertToTablet() { - try { - if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(getInsertNode(), getPattern()); - } - return dataContainer.convertToTablet(); - } catch (Exception e) { - LOGGER.error("Process tablet error.", e); - throw new PipeException("Process tablet error.", e); - } - } - - @TestOnly - public Tablet convertToTabletForTest(InsertNode insertNode, String pattern) { - try { - if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(insertNode, pattern); - } - return dataContainer.convertToTablet(); - } catch (Exception e) { - LOGGER.error("Process tablet error.", e); - throw new PipeException("Process tablet error.", e); - } - } - /////////////////////////// Object /////////////////////////// @Override public String toString() { - return "PipeTabletTabletInsertionEvent{" + return "PipeRawTabletInsertionEvent{" + "walEntryHandler=" + walEntryHandler + ", progressIndex=" diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java similarity index 74% rename from server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java rename to server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java index 014972abd7f..abb33534eb8 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletTabletInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeRawTabletInsertionEvent.java @@ -29,18 +29,18 @@ import org.apache.iotdb.tsfile.write.record.Tablet; import java.util.Objects; import java.util.function.BiConsumer; -public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { +public class PipeRawTabletInsertionEvent implements TabletInsertionEvent { private final Tablet tablet; private final String pattern; private TabletInsertionDataContainer dataContainer; - public PipeTabletTabletInsertionEvent(Tablet tablet) { + public PipeRawTabletInsertionEvent(Tablet tablet) { this(Objects.requireNonNull(tablet), null); } - public PipeTabletTabletInsertionEvent(Tablet tablet, String pattern) { + public PipeRawTabletInsertionEvent(Tablet tablet, String pattern) { this.tablet = Objects.requireNonNull(tablet); this.pattern = pattern; } @@ -52,7 +52,7 @@ public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { /////////////////////////// TabletInsertionEvent /////////////////////////// @Override - public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) { if (dataContainer == null) { dataContainer = new TabletInsertionDataContainer(tablet, getPattern()); } @@ -60,7 +60,7 @@ public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { } @Override - public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) { if (dataContainer == null) { dataContainer = new TabletInsertionDataContainer(tablet, getPattern()); } @@ -68,8 +68,16 @@ public class PipeTabletTabletInsertionEvent implements TabletInsertionEvent { } public Tablet convertToTablet() { + final String pattern = getPattern(); + + // if pattern is "root", we don't need to convert, just return the original tablet + if (pattern.equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { + return tablet; + } + + // if pattern is not "root", we need to convert the tablet if (dataContainer == null) { - dataContainer = new TabletInsertionDataContainer(tablet, getPattern()); + dataContainer = new TabletInsertionDataContainer(tablet, pattern); } return dataContainer.convertToTablet(); } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java index c813619be9a..2de0cd31d19 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileInsertionEvent { @@ -152,12 +153,16 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns } return dataContainer.toTabletInsertionEvents(); } catch (InterruptedException e) { - String errorMsg = + final String errorMsg = String.format( "Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath()); LOGGER.warn(errorMsg); Thread.currentThread().interrupt(); throw new PipeException(errorMsg); + } catch (IOException e) { + final String errorMsg = String.format("Read TsFile %s error.", resource.getTsFilePath()); + LOGGER.warn(errorMsg); + throw new PipeException(errorMsg); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java index dfab1e1aa2b..85f4a210ccb 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java @@ -26,6 +26,7 @@ import org.apache.iotdb.pipe.api.type.Binary; import org.apache.iotdb.pipe.api.type.Type; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.util.Arrays; @@ -39,8 +40,9 @@ public class PipeRow implements Row { private final MeasurementSchema[] measurementSchemaList; private final long[] timestampColumn; - private final Object[] valueColumns; private final TSDataType[] valueColumnTypes; + private final Object[] valueColumns; + private final BitMap[] bitMaps; private final String[] columnNameStringList; @@ -49,15 +51,17 @@ public class PipeRow implements Row { String deviceId, MeasurementSchema[] measurementSchemaList, long[] timestampColumn, - Object[] valueColumns, TSDataType[] valueColumnTypes, + Object[] valueColumns, + BitMap[] bitMaps, String[] columnNameStringList) { this.rowIndex = rowIndex; this.deviceId = deviceId; this.measurementSchemaList = measurementSchemaList; this.timestampColumn = timestampColumn; - this.valueColumns = valueColumns; this.valueColumnTypes = valueColumnTypes; + this.valueColumns = valueColumns; + this.bitMaps = bitMaps; this.columnNameStringList = columnNameStringList; } @@ -103,7 +107,25 @@ public class PipeRow implements Row { @Override public Object getObject(int columnIndex) { - return ((Object[]) valueColumns[columnIndex])[rowIndex]; + switch (getDataType(columnIndex)) { + case INT32: + return getInt(columnIndex); + case INT64: + return getLong(columnIndex); + case FLOAT: + return getFloat(columnIndex); + case DOUBLE: + return getDouble(columnIndex); + case BOOLEAN: + return getBoolean(columnIndex); + case TEXT: + return getBinary(columnIndex); + default: + throw new UnsupportedOperationException( + String.format( + "unsupported data type %s for column %s", + getDataType(columnIndex), columnNameStringList[columnIndex])); + } } @Override @@ -113,7 +135,7 @@ public class PipeRow implements Row { @Override public boolean isNull(int columnIndex) { - return ((Object[]) valueColumns[columnIndex])[rowIndex] == null; + return bitMaps[columnIndex].isMarked(rowIndex); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java index ab0371252f3..8010b67e57e 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/collector/PipeRowCollector.java @@ -19,8 +19,7 @@ package org.apache.iotdb.db.pipe.core.event.view.collector; -import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; @@ -35,6 +34,7 @@ import java.util.List; public class PipeRowCollector implements RowCollector { + private final List<TabletInsertionEvent> tabletInsertionEventList = new ArrayList<>(); private Tablet tablet = null; @Override @@ -63,16 +63,21 @@ public class PipeRowCollector implements RowCollector { } } tablet.rowSize++; - } - public TabletInsertionEvent toTabletInsertionEvent() { - if (tablet == null) { - return new PipeEmptyTabletInsertionEvent(); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + collectTabletInsertionEvent(); } + } - PipeTabletTabletInsertionEvent tabletInsertionEvent = - new PipeTabletTabletInsertionEvent(tablet); + private void collectTabletInsertionEvent() { + if (tablet != null) { + tabletInsertionEventList.add(new PipeRawTabletInsertionEvent(tablet)); + } this.tablet = null; - return tabletInsertionEvent; + } + + public Iterable<TabletInsertionEvent> convertToTabletInsertionEvents() { + collectTabletInsertionEvent(); + return tabletInsertionEventList; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java index 16d21f73138..542847074c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; -import org.apache.iotdb.db.pipe.core.event.impl.PipeEmptyTabletInsertionEvent; import org.apache.iotdb.db.pipe.core.event.view.access.PipeRow; import org.apache.iotdb.db.pipe.core.event.view.collector.PipeRowCollector; import org.apache.iotdb.pipe.api.access.Row; @@ -40,6 +39,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.BiConsumer; @@ -52,8 +52,9 @@ public class TabletInsertionDataContainer { private String[] columnNameStringList; private long[] timestampColumn; - private Object[] valueColumns; private TSDataType[] valueColumnTypes; + // each column of Object[] is a column of primitive type array + private Object[] valueColumns; private BitMap[] nullValueColumnBitmaps; private int rowCount; @@ -89,7 +90,8 @@ public class TabletInsertionDataContainer { this.timestampColumn = new long[] {insertRowNode.getTime()}; generateColumnIndexMapper( - insertRowNode, pattern, originColumnIndex2FilteredColumnIndexMapperList); + insertRowNode.getMeasurements(), pattern, originColumnIndex2FilteredColumnIndexMapperList); + final int filteredColumnSize = Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) .filter(Objects::nonNull) @@ -98,22 +100,46 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; + this.valueColumns = new Object[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; final MeasurementSchema[] originMeasurementSchemaList = insertRowNode.getMeasurementSchemas(); final String[] originColumnNameStringList = insertRowNode.getMeasurements(); - final Object[] originValueColumns = insertRowNode.getValues(); final TSDataType[] originValueColumnTypes = insertRowNode.getDataTypes(); + final Object[] originValueColumns = insertRowNode.getValues(); for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; - this.valueColumns[filteredColumnIndex] = originValueColumns[i]; this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; + switch (originValueColumnTypes[i]) { + case INT32: + this.valueColumns[filteredColumnIndex] = new int[] {(Integer) originValueColumns[i]}; + break; + case INT64: + this.valueColumns[filteredColumnIndex] = new long[] {(Long) originValueColumns[i]}; + break; + case FLOAT: + this.valueColumns[filteredColumnIndex] = new float[] {(Float) originValueColumns[i]}; + break; + case DOUBLE: + this.valueColumns[filteredColumnIndex] = new double[] {(Double) originValueColumns[i]}; + break; + case BOOLEAN: + this.valueColumns[filteredColumnIndex] = + new boolean[] {(Boolean) originValueColumns[i]}; + break; + case TEXT: + this.valueColumns[filteredColumnIndex] = new Binary[] {(Binary) originValueColumns[i]}; + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Data type %s is not supported.", originValueColumnTypes[i].toString())); + } this.nullValueColumnBitmaps[filteredColumnIndex] = new BitMap(1); } } @@ -130,7 +156,9 @@ public class TabletInsertionDataContainer { this.timestampColumn = insertTabletNode.getTimes(); generateColumnIndexMapper( - insertTabletNode, pattern, originColumnIndex2FilteredColumnIndexMapperList); + insertTabletNode.getMeasurements(), + pattern, + originColumnIndex2FilteredColumnIndexMapperList); final int filteredColumnSize = Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) @@ -140,15 +168,15 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; + this.valueColumns = new Object[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; final MeasurementSchema[] originMeasurementSchemaList = insertTabletNode.getMeasurementSchemas(); final String[] originColumnNameStringList = insertTabletNode.getMeasurements(); - final Object[] originValueColumns = insertTabletNode.getColumns(); final TSDataType[] originValueColumnTypes = insertTabletNode.getDataTypes(); + final Object[] originValueColumns = insertTabletNode.getColumns(); final BitMap[] originBitMapList = (insertTabletNode.getBitMaps() == null ? IntStream.range(0, originColumnSize) @@ -156,15 +184,19 @@ public class TabletInsertionDataContainer { .map(o -> new BitMap(timestampColumn.length)) .toArray(BitMap[]::new) : insertTabletNode.getBitMaps()); + for (int i = 0; i < originBitMapList.length; i++) { + if (originBitMapList[i] == null) { + originBitMapList[i] = new BitMap(timestampColumn.length); + } + } for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; - this.valueColumns[filteredColumnIndex] = - convertToColumn(originValueColumns[i], originValueColumnTypes[i], originBitMapList[i]); this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; + this.valueColumns[filteredColumnIndex] = originValueColumns[i]; this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i]; } } @@ -179,7 +211,13 @@ public class TabletInsertionDataContainer { this.deviceId = tablet.deviceId; this.timestampColumn = tablet.timestamps; - generateColumnIndexMapper(tablet, pattern, originColumnIndex2FilteredColumnIndexMapperList); + final List<MeasurementSchema> originMeasurementSchemaList = tablet.getSchemas(); + final String[] originMeasurementList = new String[originMeasurementSchemaList.size()]; + for (int i = 0; i < originMeasurementSchemaList.size(); i++) { + originMeasurementList[i] = originMeasurementSchemaList.get(i).getMeasurementId(); + } + generateColumnIndexMapper( + originMeasurementList, pattern, originColumnIndex2FilteredColumnIndexMapperList); final int filteredColumnSize = Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) @@ -189,11 +227,10 @@ public class TabletInsertionDataContainer { this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; - this.valueColumns = new Object[filteredColumnSize]; this.valueColumnTypes = new TSDataType[filteredColumnSize]; + this.valueColumns = new Object[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; - final List<MeasurementSchema> originMeasurementSchemaList = tablet.getSchemas(); final String[] originColumnNameStringList = new String[originColumnSize]; final TSDataType[] originValueColumnTypes = new TSDataType[originColumnSize]; for (int i = 0; i < originColumnSize; i++) { @@ -201,16 +238,26 @@ public class TabletInsertionDataContainer { originValueColumnTypes[i] = originMeasurementSchemaList.get(i).getType(); } final Object[] originValueColumns = tablet.values; - final BitMap[] originBitMapList = tablet.bitMaps; + final BitMap[] originBitMapList = + tablet.bitMaps == null + ? IntStream.range(0, originColumnSize) + .boxed() + .map(o -> new BitMap(timestampColumn.length)) + .toArray(BitMap[]::new) + : tablet.bitMaps; + for (int i = 0; i < originBitMapList.length; i++) { + if (originBitMapList[i] == null) { + originBitMapList[i] = new BitMap(timestampColumn.length); + } + } for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList.get(i); this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; - this.valueColumns[filteredColumnIndex] = - convertToColumn(originValueColumns[i], originValueColumnTypes[i], originBitMapList[i]); this.valueColumnTypes[filteredColumnIndex] = originValueColumnTypes[i]; + this.valueColumns[filteredColumnIndex] = originValueColumns[i]; this.nullValueColumnBitmaps[filteredColumnIndex] = originBitMapList[i]; } } @@ -218,6 +265,7 @@ public class TabletInsertionDataContainer { rowCount = tablet.rowSize; } + // TODO: cache the result keyed by deviceId to improve performance private void generateColumnIndexMapper( String[] originMeasurementList, String pattern, @@ -243,7 +291,6 @@ public class TabletInsertionDataContainer { // low cost check comes first if (pattern.length() == deviceId.length() + measurement.length() + 1 // high cost check comes later - && pattern.startsWith(deviceId) && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) { originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++; } @@ -251,107 +298,34 @@ public class TabletInsertionDataContainer { } } - private void generateColumnIndexMapper( - InsertNode insertNode, - String pattern, - Integer[] originColumnIndex2FilteredColumnIndexMapperList) { - generateColumnIndexMapper( - insertNode.getMeasurements(), pattern, originColumnIndex2FilteredColumnIndexMapperList); - } - - private void generateColumnIndexMapper( - Tablet tablet, String pattern, Integer[] originColumnIndex2FilteredColumnIndexMapperList) { - final List<MeasurementSchema> originMeasurementSchemaList = tablet.getSchemas(); - final String[] originMeasurementList = new String[originMeasurementSchemaList.size()]; - for (int i = 0; i < originMeasurementSchemaList.size(); i++) { - originMeasurementList[i] = originMeasurementSchemaList.get(i).getMeasurementId(); - } - generateColumnIndexMapper( - originMeasurementList, pattern, originColumnIndex2FilteredColumnIndexMapperList); - } - - private Object convertToColumn(Object originColumn, TSDataType dataType, BitMap bitMap) { - switch (dataType) { - case INT32: - final int[] intValues = (int[]) originColumn; - final int[] integerValues = new int[intValues.length]; - for (int i = 0; i < intValues.length; i++) { - integerValues[i] = bitMap != null && bitMap.isMarked(i) ? 0 : intValues[i]; - } - return integerValues; - case INT64: - final long[] longValues = (long[]) originColumn; - final long[] longValues2 = new long[longValues.length]; - for (int i = 0; i < longValues.length; i++) { - longValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : longValues[i]; - } - return longValues2; - case FLOAT: - final float[] floatValues = (float[]) originColumn; - final float[] floatValues2 = new float[floatValues.length]; - for (int i = 0; i < floatValues.length; i++) { - floatValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : floatValues[i]; - } - return floatValues2; - case DOUBLE: - final double[] doubleValues = (double[]) originColumn; - final double[] doubleValues2 = new double[doubleValues.length]; - for (int i = 0; i < doubleValues.length; i++) { - doubleValues2[i] = bitMap != null && bitMap.isMarked(i) ? 0 : doubleValues[i]; - } - return doubleValues2; - case BOOLEAN: - final boolean[] booleanValues = (boolean[]) originColumn; - final boolean[] booleanValues2 = new boolean[booleanValues.length]; - for (int i = 0; i < booleanValues.length; i++) { - booleanValues2[i] = (bitMap == null || !bitMap.isMarked(i)) && booleanValues[i]; - } - return booleanValues2; - case TEXT: - final Binary[] binaryValues = (Binary[]) originColumn; - final Binary[] stringValues = new Binary[binaryValues.length]; - for (int i = 0; i < binaryValues.length; i++) { - stringValues[i] = - bitMap != null && bitMap.isMarked(i) - ? null - : (binaryValues[i] == null - ? null - : Binary.valueOf(binaryValues[i].getStringValue())); - } - return stringValues; - default: - throw new UnSupportedDataTypeException( - String.format("Data type %s is not supported.", dataType)); - } - } - //////////////////////////// process //////////////////////////// - public TabletInsertionEvent processRowByRow(BiConsumer<Row, RowCollector> consumer) { - final PipeRowCollector rowCollector = new PipeRowCollector(); - if (valueColumns.length == 0) { - return new PipeEmptyTabletInsertionEvent(); + public Iterable<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector> consumer) { + if (valueColumns.length == 0 || timestampColumn.length == 0) { + return Collections.emptyList(); } - for (int i = 0; i < timestampColumn.length; i++) { + final PipeRowCollector rowCollector = new PipeRowCollector(); + for (int i = 0; i < rowCount; i++) { consumer.accept( new PipeRow( i, deviceId, measurementSchemaList, timestampColumn, - valueColumns, valueColumnTypes, + valueColumns, + nullValueColumnBitmaps, columnNameStringList), rowCollector); } - return rowCollector.toTabletInsertionEvent(); + return rowCollector.convertToTabletInsertionEvents(); } - public TabletInsertionEvent processTablet(BiConsumer<Tablet, RowCollector> consumer) { + public Iterable<TabletInsertionEvent> processTablet(BiConsumer<Tablet, RowCollector> consumer) { final PipeRowCollector rowCollector = new PipeRowCollector(); consumer.accept(convertToTablet(), rowCollector); - return rowCollector.toTabletInsertionEvent(); + return rowCollector.convertToTabletInsertionEvents(); } //////////////////////////// convert //////////////////////////// @@ -362,10 +336,8 @@ public class TabletInsertionDataContainer { } final int columnSize = measurementSchemaList.length; - final List<MeasurementSchema> measurementSchemaArrayList = new ArrayList<>(Arrays.asList(measurementSchemaList).subList(0, columnSize)); - final Tablet newTablet = new Tablet(deviceId, measurementSchemaArrayList, rowCount); newTablet.timestamps = timestampColumn; newTablet.bitMaps = nullValueColumnBitmaps; @@ -373,6 +345,7 @@ public class TabletInsertionDataContainer { newTablet.rowSize = rowCount; tablet = newTablet; + return tablet; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java index 9035a8fe0bb..79274e48fc7 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java @@ -19,13 +19,13 @@ package org.apache.iotdb.db.pipe.core.event.view.datastructure; -import org.apache.iotdb.db.pipe.core.event.impl.PipeTabletTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileReader; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; -import org.apache.iotdb.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,100 +37,127 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; -public class TsFileInsertionDataContainer { +public class TsFileInsertionDataContainer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionDataContainer.class); - private final File tsFile; private final String pattern; - private TimeseriesMetadata vectorTimeseriesMetadata; + private final TsFileSequenceReader tsFileSequenceReader; + private final TsFileReader tsFileReader; - private final Map<String, List<TimeseriesMetadata>> device2TimeseriesMetadataMap; + private final Iterator<Map.Entry<String, List<String>>> deviceMeasurementsMapIterator; + private final Map<String, TSDataType> measurementDataTypeMap; - public TsFileInsertionDataContainer(File tsFile, String pattern) { - this.tsFile = tsFile; + public TsFileInsertionDataContainer(File tsFile, String pattern) throws IOException { this.pattern = pattern; - this.device2TimeseriesMetadataMap = collectDevice2TimeseriesMetadataMap(); + tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath()); + tsFileReader = new TsFileReader(tsFileSequenceReader); + + deviceMeasurementsMapIterator = filterDeviceMeasurementsMapByPattern().entrySet().iterator(); + measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap(); } - private Map<String, List<TimeseriesMetadata>> collectDevice2TimeseriesMetadataMap() { - final Map<String, List<TimeseriesMetadata>> result = new HashMap<>(); + private Map<String, List<String>> filterDeviceMeasurementsMapByPattern() throws IOException { + final Map<String, List<String>> filteredDeviceMeasurementsMap = new HashMap<>(); - try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getPath())) { - // match pattern - for (Map.Entry<String, List<TimeseriesMetadata>> entry : - reader.getAllTimeseriesMetadata(true).entrySet()) { - final String device = entry.getKey(); - boolean isVector = false; + for (Map.Entry<String, List<String>> entry : + tsFileSequenceReader.getDeviceMeasurementsMap().entrySet()) { + final String deviceId = entry.getKey(); - // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c - // in this case, all data can be matched without checking the measurements - if (pattern == null || pattern.length() <= device.length() && device.startsWith(pattern)) { - result.put(device, entry.getValue()); + // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c + // in this case, all data can be matched without checking the measurements + if (pattern == null + || pattern.length() <= deviceId.length() && deviceId.startsWith(pattern)) { + if (!entry.getValue().isEmpty()) { + filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); } + } - // case 2: for example, pattern is root.a.b.c and device is root.a.b - // in this case, we need to check the full path - else { - final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); - - for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) { - // TODO: test me!!! - if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) { - vectorTimeseriesMetadata = timeseriesMetadata; - isVector = false; - continue; - } - - final String measurement = timeseriesMetadata.getMeasurementId(); - // low cost check comes first - if (pattern.length() == measurement.length() + device.length() + 1 - // high cost check comes later - && pattern.startsWith(device) - && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) { - if (!isVector) { - isVector = true; - timeseriesMetadataList.add(vectorTimeseriesMetadata); - } - timeseriesMetadataList.add(timeseriesMetadata); - } + // case 2: for example, pattern is root.a.b.c and device is root.a.b + // in this case, we need to check the full path + else if (pattern.length() > deviceId.length() && pattern.startsWith(deviceId)) { + final List<String> filteredMeasurements = new ArrayList<>(); + + for (final String measurement : entry.getValue()) { + // low cost check comes first + if (pattern.length() == deviceId.length() + measurement.length() + 1 + // high cost check comes later + && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) { + filteredMeasurements.add(measurement); } + } - if (!timeseriesMetadataList.isEmpty()) { - result.put(device, timeseriesMetadataList); - } + if (!filteredMeasurements.isEmpty()) { + filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements); } } - } catch (IOException e) { - LOGGER.error("Cannot read TsFile {}.", tsFile.getPath(), e); } - return result; + return filteredDeviceMeasurementsMap; } + /** @return TabletInsertionEvent in a streaming way */ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() { return () -> new Iterator<TabletInsertionEvent>() { - private final Iterator<Tablet> tabletIterator = constructTabletIterable().iterator(); + private TsFileInsertionDataTabletIterator tabletIterator = null; @Override public boolean hasNext() { - return tabletIterator.hasNext(); + return (tabletIterator != null && tabletIterator.hasNext()) + || deviceMeasurementsMapIterator.hasNext(); } @Override public TabletInsertionEvent next() { - return new PipeTabletTabletInsertionEvent(tabletIterator.next()); + if (!hasNext()) { + throw new NoSuchElementException(); + } + + while (tabletIterator == null || !tabletIterator.hasNext()) { + if (!deviceMeasurementsMapIterator.hasNext()) { + throw new NoSuchElementException(); + } + + final Map.Entry<String, List<String>> entry = deviceMeasurementsMapIterator.next(); + + try { + tabletIterator = + new TsFileInsertionDataTabletIterator( + tsFileReader, measurementDataTypeMap, entry.getKey(), entry.getValue()); + } catch (IOException e) { + throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); + } + } + + final TabletInsertionEvent next = + new PipeRawTabletInsertionEvent(tabletIterator.next()); + + if (!hasNext()) { + try { + close(); + } catch (Exception e) { + LOGGER.warn("Failed to close TsFileInsertionDataContainer", e); + } + } + + return next; } }; } - private Iterable<Tablet> constructTabletIterable() { - return () -> - new TsFileInsertionDataTabletIterator(tsFile.getPath(), device2TimeseriesMetadataMap); + @Override + public void close() throws Exception { + if (tsFileReader != null) { + tsFileReader.close(); + } + if (tsFileSequenceReader != null) { + tsFileSequenceReader.close(); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java index 7100e68b753..15959adb7ee 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java @@ -20,101 +20,72 @@ package org.apache.iotdb.db.pipe.core.event.view.datastructure; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.encoding.decoder.Decoder; -import org.apache.iotdb.tsfile.file.MetaMarker; -import org.apache.iotdb.tsfile.file.header.ChunkHeader; -import org.apache.iotdb.tsfile.file.header.PageHeader; -import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.TsFileSequenceReader; -import org.apache.iotdb.tsfile.read.common.BatchData; -import org.apache.iotdb.tsfile.read.reader.page.PageReader; -import org.apache.iotdb.tsfile.read.reader.page.TimePageReader; -import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader; -import org.apache.iotdb.tsfile.utils.BitMap; -import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.read.TsFileReader; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.stream.Collectors; public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> { - private static Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionDataTabletIterator.class); - private final TsFileSequenceReader reader; - private final String filePath; - private final Iterator<Map.Entry<String, List<TimeseriesMetadata>>> entriesIterator; - private Map.Entry<String, List<TimeseriesMetadata>> currentEntry; - private Iterator<TimeseriesMetadata> timeseriesMetadataIterator; - private TimeseriesMetadata currentTimeseriesMetadata; - private List<MeasurementSchema> measurementSchemas; + private final TsFileReader tsFileReader; + private final Map<String, TSDataType> measurementDataTypeMap; - private boolean isAligned; - private final List<long[]> timeBatches; - private long[] timestampsForAligned; + private final String deviceId; + private final List<String> measurements; - public TsFileInsertionDataTabletIterator( - String filePath, Map<String, List<TimeseriesMetadata>> device2TimeseriesMetadataMap) { - this.filePath = filePath; - this.entriesIterator = device2TimeseriesMetadataMap.entrySet().iterator(); - this.timeBatches = new ArrayList<>(); - this.currentEntry = null; - this.timeseriesMetadataIterator = null; - this.currentTimeseriesMetadata = null; - this.measurementSchemas = null; - this.isAligned = false; - this.timestampsForAligned = null; - try { - this.reader = new TsFileSequenceReader(filePath); - } catch (IOException e) { - throw new PipeException("Cannot create TsFileSequenceReader for file " + filePath, e); - } + private final QueryDataSet queryDataSet; - // Initialize timeseriesMetadataIterator if there is a next entry - if (entriesIterator.hasNext()) { - currentEntry = entriesIterator.next(); - timeseriesMetadataIterator = currentEntry.getValue().iterator(); - } else { - timeseriesMetadataIterator = - new Iterator<TimeseriesMetadata>() { - @Override - public boolean hasNext() { - return false; - } + public TsFileInsertionDataTabletIterator( + TsFileReader tsFileReader, + Map<String, TSDataType> measurementDataTypeMap, + String deviceId, + List<String> measurements) + throws IOException { + this.tsFileReader = tsFileReader; + this.measurementDataTypeMap = measurementDataTypeMap; + + this.deviceId = deviceId; + this.measurements = + measurements.stream() + .filter( + measurement -> + // time column in aligned time-series should not be a query column + measurement != null && !measurement.isEmpty()) + .sorted() + .collect(Collectors.toList()); + + this.queryDataSet = buildQueryDataSet(); + } - @Override - public TimeseriesMetadata next() { - return null; - } - }; + private QueryDataSet buildQueryDataSet() throws IOException { + final List<Path> paths = new ArrayList<>(); + for (String measurement : measurements) { + paths.add(new Path(deviceId, measurement, false)); } + return tsFileReader.query(QueryExpression.create(paths, null)); } @Override public boolean hasNext() { - boolean hasNext = timeseriesMetadataIterator.hasNext() || entriesIterator.hasNext(); - if (!hasNext) { - try { - reader.close(); - } catch (IOException e) { - LOGGER.warn("Cannot close TsFileSequenceReader for file {}", filePath, e); - } + try { + return queryDataSet.hasNext(); + } catch (IOException e) { + throw new PipeException("Failed to check next", e); } - return hasNext; } @Override @@ -123,165 +94,48 @@ public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> { throw new NoSuchElementException(); } - if (!timeseriesMetadataIterator.hasNext()) { - currentEntry = entriesIterator.next(); - timeseriesMetadataIterator = currentEntry.getValue().iterator(); - } - currentTimeseriesMetadata = timeseriesMetadataIterator.next(); - measurementSchemas = new ArrayList<>(); - - try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { - if (currentTimeseriesMetadata.getTSDataType() == TSDataType.VECTOR) { - processTimeseriesMetadata(currentTimeseriesMetadata, reader); - currentTimeseriesMetadata = timeseriesMetadataIterator.next(); - } - return processTimeseriesMetadata(currentTimeseriesMetadata, reader); + try { + return buildNextTablet(); } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private Tablet createTablet(long[] timestamps, Object[] values, BitMap[] bitMaps) { - long[] tmp; - - if (isAligned) { - if (timestampsForAligned == null) { - timestampsForAligned = timestamps; - return null; - } - tmp = timestampsForAligned; - } else { - tmp = timestamps; + throw new PipeException("Failed to build tablet", e); } - - // create tablet - int rowSize = tmp.length; - Tablet tablet = new Tablet(currentEntry.getKey(), measurementSchemas, rowSize); - tablet.timestamps = tmp; - tablet.values = values; - tablet.rowSize = rowSize; - tablet.bitMaps = bitMaps; - - return tablet; } - private Tablet processTimeseriesMetadata( - TimeseriesMetadata timeseriesMetadata, TsFileSequenceReader reader) { - int pageIndex = 0; - if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) { - isAligned = true; - timeBatches.clear(); - } else { - MeasurementSchema measurementSchema = - new MeasurementSchema( - timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTSDataType()); - measurementSchemas.add(measurementSchema); + private Tablet buildNextTablet() throws IOException { + final List<MeasurementSchema> schemas = new ArrayList<>(); + for (final String measurement : measurements) { + final TSDataType dataType = + measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR + measurement); + schemas.add(new MeasurementSchema(measurement, dataType)); } + final Tablet tablet = new Tablet(deviceId, schemas); + tablet.initBitMaps(); - List<Byte> bitMapBytes = new ArrayList<>(); - List<Object> measurementValues = new ArrayList<>(); - List<Long> measurementTimestamps = new ArrayList<>(); - - for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { - long offset = chunkMetadata.getOffsetOfChunkHeader(); - try { - reader.position(offset); - ChunkHeader header = reader.readChunkHeader(reader.readMarker()); - int dataSize = header.getDataSize(); - - Decoder defaultTimeDecoder = - Decoder.getDecoderByType( - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), - TSDataType.INT64); - Decoder valueDecoder = - Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); - pageIndex = 0; - if (header.getDataType() == TSDataType.VECTOR) { - timeBatches.clear(); - } - - while (dataSize > 0) { - PageHeader pageHeader = - reader.readPageHeader( - header.getDataType(), (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); - ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); + while (queryDataSet.hasNext()) { + final RowRecord rowRecord = queryDataSet.next(); - // Time column chunk - if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) - == TsFileConstant.TIME_COLUMN_MASK) { - TimePageReader timePageReader = - new TimePageReader(pageHeader, pageData, defaultTimeDecoder); - long[] timeBatch = timePageReader.getNextTimeBatch(); - timeBatches.add(timeBatch); + final int rowIndex = tablet.rowSize; - for (long time : timeBatch) { - measurementTimestamps.add(time); - } - } - // Value column chunk - else if ((header.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK) - == TsFileConstant.VALUE_COLUMN_MASK) { - ValuePageReader valuePageReader = - new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder); + tablet.addTimestamp(rowIndex, rowRecord.getTimestamp()); - for (byte value : valuePageReader.getBitmap()) { - bitMapBytes.add(value); - } - - for (TsPrimitiveType value : - valuePageReader.nextValueBatch(timeBatches.get(pageIndex))) { - measurementValues.add(value.getValue()); - } - } - - // NonAligned Chunk - else { - PageReader pageReader = - new PageReader( - pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null); - BatchData batchData = pageReader.getAllSatisfiedPageData(); - List<Integer> isNullList = new ArrayList<>(); - int index = 0; - while (batchData.hasCurrent()) { - measurementTimestamps.add(batchData.currentTime()); - Object value = batchData.currentValue(); - - if (value == null) { - isNullList.add(index); - } - measurementValues.add(value); - index++; - batchData.next(); - } - - BitMap bitmap = new BitMap(measurementTimestamps.size()); - for (int isNull : isNullList) { - bitmap.mark(isNull); - } - byte[] bytes = bitmap.getByteArray(); - for (byte value : bytes) { - bitMapBytes.add(value); - } - } - pageIndex++; - dataSize -= pageHeader.getSerializedPageSize(); + final List<Field> fields = rowRecord.getFields(); + final int fieldSize = fields.size(); + for (int i = 0; i < fieldSize; i++) { + final Field field = fields.get(i); + if (field == null || field.getDataType() == null) { + tablet.bitMaps[i].mark(rowIndex); + } else { + tablet.addValue(measurements.get(i), rowIndex, field.getObjectValue(field.getDataType())); } - } catch (IOException e) { - throw new UncheckedIOException(e); } - } - long[] timestamps = new long[measurementTimestamps.size()]; - for (int i = 0; i < measurementTimestamps.size(); i++) { - timestamps[i] = measurementTimestamps.get(i); - } + tablet.rowSize++; - byte[] byteArray = new byte[bitMapBytes.size()]; - for (int i = 0; i < bitMapBytes.size(); i++) { - byteArray[i] = bitMapBytes.get(i); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + break; + } } - BitMap[] bitMaps = new BitMap[] {new BitMap(byteArray.length, byteArray)}; - return createTablet(timestamps, measurementValues.toArray(), bitMaps); + return tablet; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java index 62979b7d52c..c9774b4618a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/processor/PipeDoNothingProcessor.java @@ -56,15 +56,23 @@ public class PipeDoNothingProcessor implements PipeProcessor { .equals(PipeCollectorConstant.COLLECTOR_PATTERN_DEFAULT_VALUE)) { eventCollector.collect(tabletInsertionEvent); } else { - eventCollector.collect( - tabletInsertionEvent.processRowByRow( + tabletInsertionEvent + .processRowByRow( (row, rowCollector) -> { try { rowCollector.collectRow(row); } catch (IOException e) { throw new PipeException("Failed to collect row", e); } - })); + }) + .forEach( + event -> { + try { + eventCollector.collect(event); + } catch (IOException e) { + throw new PipeException("Failed to collect event", e); + } + }); } } else { eventCollector.collect(tabletInsertionEvent); diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java similarity index 78% rename from server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java rename to server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java index 5a211fc2cff..6a68b5ad7cd 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeInsertNodeTabletInsertionEventTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/PipeTabletInsertionEventTest.java @@ -24,7 +24,8 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; -import org.apache.iotdb.db.pipe.core.event.impl.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.view.datastructure.TabletInsertionDataContainer; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; @@ -37,7 +38,7 @@ import org.junit.Test; import java.util.Arrays; -public class PipeInsertNodeTabletInsertionEventTest { +public class PipeTabletInsertionEventTest { InsertRowNode insertRowNode; InsertTabletNode insertTabletNode; @@ -56,7 +57,6 @@ public class PipeInsertNodeTabletInsertionEventTest { }; final MeasurementSchema[] schemas = new MeasurementSchema[6]; - final Object[] values = new Object[6]; final String pattern = "root.sg.d1"; @@ -78,6 +78,14 @@ public class PipeInsertNodeTabletInsertionEventTest { } private void createInsertRowNode() throws IllegalPathException { + final Object[] values = new Object[6]; + + values[0] = 100; + values[1] = 10000L; + values[2] = 2F; + values[3] = 1.0; + values[4] = false; + values[5] = Binary.valueOf("text"); insertRowNode = new InsertRowNode( @@ -93,6 +101,24 @@ public class PipeInsertNodeTabletInsertionEventTest { } private void createInsertTabletNode() throws IllegalPathException { + final Object[] values = new Object[6]; + + values[0] = new int[5]; + values[1] = new long[5]; + values[2] = new float[5]; + values[3] = new double[5]; + values[4] = new boolean[5]; + values[5] = new Binary[5]; + + for (int r = 0; r < 5; r++) { + ((int[]) values[0])[r] = 100; + ((long[]) values[1])[r] = 10000; + ((float[]) values[2])[r] = 2; + ((double[]) values[3])[r] = 1.0; + ((boolean[]) values[4])[r] = false; + ((Binary[]) values[5])[r] = Binary.valueOf("text"); + } + this.insertTabletNode = new InsertTabletNode( new PlanNodeId("plannode 1"), @@ -108,6 +134,7 @@ public class PipeInsertNodeTabletInsertionEventTest { } private void createTablet() { + final Object[] values = new Object[6]; // create tablet for insertRowNode BitMap[] bitMapsForInsertRowNode = new BitMap[6]; @@ -167,12 +194,18 @@ public class PipeInsertNodeTabletInsertionEventTest { @Test public void convertToTabletForTest() { - PipeInsertNodeTabletInsertionEvent event1 = new PipeInsertNodeTabletInsertionEvent(null, null); - Tablet tablet1 = event1.convertToTabletForTest(insertRowNode, pattern); + Tablet tablet1 = new TabletInsertionDataContainer(insertRowNode, pattern).convertToTablet(); Assert.assertEquals(tablet1, tabletForInsertRowNode); - PipeInsertNodeTabletInsertionEvent event2 = new PipeInsertNodeTabletInsertionEvent(null, null); - Tablet tablet2 = event2.convertToTabletForTest(insertTabletNode, pattern); + Tablet tablet2 = new TabletInsertionDataContainer(insertTabletNode, pattern).convertToTablet(); Assert.assertEquals(tablet2, tabletForInsertTabletNode); + + PipeRawTabletInsertionEvent event3 = new PipeRawTabletInsertionEvent(tablet1, pattern); + Tablet tablet3 = event3.convertToTablet(); + Assert.assertEquals(tablet1, tablet3); + + PipeRawTabletInsertionEvent event4 = new PipeRawTabletInsertionEvent(tablet2, pattern); + Tablet tablet4 = event4.convertToTablet(); + Assert.assertEquals(tablet2, tablet4); } } diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java new file mode 100644 index 00000000000..d74f790fd15 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java @@ -0,0 +1,608 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.core.event; + +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.view.datastructure.TsFileInsertionDataContainer; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.fail; + +public class TsFileInsertionDataContainerTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(TsFileInsertionDataContainerTest.class); + + private File alignedTsFile; + private File nonalignedTsFile; + + @After + public void tearDown() throws Exception { + if (alignedTsFile != null) { + alignedTsFile.delete(); + } + if (nonalignedTsFile != null) { + nonalignedTsFile.delete(); + } + } + + @Test + public void testToTabletInsertionEvents() throws Exception { + Set<Integer> deviceNumbers = new HashSet<>(); + deviceNumbers.add(1); + deviceNumbers.add(2); + + Set<Integer> measurementNumbers = new HashSet<>(); + measurementNumbers.add(1); + measurementNumbers.add(2); + + for (int deviceNumber : deviceNumbers) { + for (int measurementNumber : measurementNumbers) { + testToTabletInsertionEvents(deviceNumber, measurementNumber, 0); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 2); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 999); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 999 * 2 + 1); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001 * 2 - 1); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023 * 2 + 1); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024 * 2); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025 * 2 - 1); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 10001); + } + } + } + + private void testToTabletInsertionEvents( + int deviceNumber, int measurementNumber, int rowNumberInOneDevice) throws Exception { + LOGGER.info( + "testToTabletInsertionEvents: deviceNumber = {}, measurementNumber = {}, rowNumberInOneDevice = {}", + deviceNumber, + measurementNumber, + rowNumberInOneDevice); + + alignedTsFile = + TsFileGeneratorUtils.generateAlignedTsFile( + "aligned.tsfile", + deviceNumber, + measurementNumber, + rowNumberInOneDevice, + 300, + 10000, + 700, + 50); + nonalignedTsFile = + TsFileGeneratorUtils.generateNonAlignedTsFile( + "nonaligned.tsfile", + deviceNumber, + measurementNumber, + rowNumberInOneDevice, + 300, + 10000, + 700, + 50); + + try (final TsFileInsertionDataContainer alignedContainer = + new TsFileInsertionDataContainer(alignedTsFile, "root"); + final TsFileInsertionDataContainer nonalignedContainer = + new TsFileInsertionDataContainer(nonalignedTsFile, "root"); ) { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + + alignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals( + measurementNumber, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + })))); + + Assert.assertEquals(count1.getAndSet(0), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count2.getAndSet(0), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count3.getAndSet(0), deviceNumber * rowNumberInOneDevice); + + nonalignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals( + measurementNumber, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })))); + + Assert.assertEquals(count1.get(), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count2.get(), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count3.get(), deviceNumber * rowNumberInOneDevice); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + AtomicReference<String> oneDeviceInAlignedTsFile = new AtomicReference<>(); + AtomicReference<String> oneMeasurementInAlignedTsFile = new AtomicReference<>(); + + AtomicReference<String> oneDeviceInUnalignedTsFile = new AtomicReference<>(); + AtomicReference<String> oneMeasurementInUnalignedTsFile = new AtomicReference<>(); + + try (TsFileSequenceReader alignedReader = + new TsFileSequenceReader(alignedTsFile.getAbsolutePath()); + TsFileSequenceReader nonalignedReader = + new TsFileSequenceReader(nonalignedTsFile.getAbsolutePath())) { + + alignedReader + .getDeviceMeasurementsMap() + .forEach( + (k, v) -> + v.stream() + .filter(p -> p != null && !p.isEmpty()) + .forEach( + p -> { + oneDeviceInAlignedTsFile.set(k); + oneMeasurementInAlignedTsFile.set(new Path(k, p, false).toString()); + })); + nonalignedReader + .getDeviceMeasurementsMap() + .forEach( + (k, v) -> + v.stream() + .filter(p -> p != null && !p.isEmpty()) + .forEach( + p -> { + oneDeviceInUnalignedTsFile.set(k); + oneMeasurementInUnalignedTsFile.set(new Path(k, p, false).toString()); + })); + } + + try (final TsFileInsertionDataContainer alignedContainer = + new TsFileInsertionDataContainer(alignedTsFile, oneDeviceInAlignedTsFile.get()); + final TsFileInsertionDataContainer nonalignedContainer = + new TsFileInsertionDataContainer( + nonalignedTsFile, oneDeviceInUnalignedTsFile.get()); ) { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + + alignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals( + measurementNumber, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + })))); + + Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice); + + nonalignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals( + measurementNumber, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })))); + + Assert.assertEquals(count1.get(), rowNumberInOneDevice); + Assert.assertEquals(count2.get(), rowNumberInOneDevice); + Assert.assertEquals(count3.get(), rowNumberInOneDevice); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (final TsFileInsertionDataContainer alignedContainer = + new TsFileInsertionDataContainer(alignedTsFile, oneMeasurementInAlignedTsFile.get()); + final TsFileInsertionDataContainer nonalignedContainer = + new TsFileInsertionDataContainer( + nonalignedTsFile, oneMeasurementInUnalignedTsFile.get()); ) { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + + alignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(1, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(1, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(1, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + })))); + + Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice); + + nonalignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(1, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(1, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(1, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })))); + + Assert.assertEquals(count1.get(), rowNumberInOneDevice); + Assert.assertEquals(count2.get(), rowNumberInOneDevice); + Assert.assertEquals(count3.get(), rowNumberInOneDevice); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (final TsFileInsertionDataContainer alignedContainer = + new TsFileInsertionDataContainer(alignedTsFile, "not-exist-pattern"); + final TsFileInsertionDataContainer nonalignedContainer = + new TsFileInsertionDataContainer(nonalignedTsFile, "not-exist-pattern"); ) { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + + alignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(0, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(0, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(0, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + })))); + + Assert.assertEquals(count1.getAndSet(0), 0); + Assert.assertEquals(count2.getAndSet(0), 0); + Assert.assertEquals(count3.getAndSet(0), 0); + + nonalignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(0, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(0, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(0, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })))); + + Assert.assertEquals(count1.get(), 0); + Assert.assertEquals(count2.get(), 0); + Assert.assertEquals(count3.get(), 0); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 6c29e7c4c44..8e760edaef4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -2049,6 +2049,24 @@ public class TsFileSequenceReader implements AutoCloseable { return result; } + /** + * get all types of measurements in this file + * + * @return full path -> datatype + */ + public Map<String, TSDataType> getFullPathDataTypeMap() throws IOException { + final Map<String, TSDataType> result = new HashMap<>(); + for (final String device : getAllDevices()) { + Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device); + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) { + result.put( + device + TsFileConstant.PATH_SEPARATOR + timeseriesMetadata.getMeasurementId(), + timeseriesMetadata.getTSDataType()); + } + } + return result; + } + public Map<String, List<String>> getDeviceMeasurementsMap() throws IOException { Map<String, List<String>> result = new HashMap<>(); for (String device : getAllDevices()) {
