This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-tablet-covert in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a5b1ac065af1a5103e7128d2184d6010ba66fbf8 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Jun 6 20:13:45 2023 +0800 support iterating tsfile --- .../core/event/impl/PipeTsFileInsertionEvent.java | 7 +- .../db/pipe/core/event/view/access/PipeRow.java | 20 +- .../TabletInsertionDataContainer.java | 3 +- .../TsFileInsertionDataContainer.java | 140 +++--- .../TsFileInsertionDataTabletIterator.java | 286 +++--------- .../event/TsFileInsertionDataContainerTest.java | 497 +++++++++++++++++++++ .../iotdb/tsfile/read/TsFileSequenceReader.java | 18 + 7 files changed, 691 insertions(+), 280 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java index c813619be9a..2de0cd31d19 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileInsertionEvent { @@ -152,12 +153,16 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns } return dataContainer.toTabletInsertionEvents(); } catch (InterruptedException e) { - String errorMsg = + final String errorMsg = String.format( "Interrupted when waiting for closing TsFile %s.", resource.getTsFilePath()); LOGGER.warn(errorMsg); Thread.currentThread().interrupt(); throw new PipeException(errorMsg); + } catch (IOException e) { + final String errorMsg = String.format("Read TsFile %s error.", resource.getTsFilePath()); + LOGGER.warn(errorMsg); + throw new PipeException(errorMsg); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java index 4a8bd65bd30..85f4a210ccb 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/access/PipeRow.java @@ -107,7 +107,25 @@ public class PipeRow implements Row { @Override public Object getObject(int columnIndex) { - return ((Object[]) valueColumns[columnIndex])[rowIndex]; + switch (getDataType(columnIndex)) { + case INT32: + return getInt(columnIndex); + case INT64: + return getLong(columnIndex); + case FLOAT: + return getFloat(columnIndex); + case DOUBLE: + return getDouble(columnIndex); + case BOOLEAN: + return getBoolean(columnIndex); + case TEXT: + return getBinary(columnIndex); + default: + throw new UnsupportedOperationException( + String.format( + "unsupported data type %s for column %s", + getDataType(columnIndex), columnNameStringList[columnIndex])); + } } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java index 56a6bc49632..542847074c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TabletInsertionDataContainer.java @@ -291,7 +291,6 @@ public class TabletInsertionDataContainer { // low cost check comes first if (pattern.length() == deviceId.length() + measurement.length() + 1 // high cost check comes later - && pattern.startsWith(deviceId) && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) { originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++; } @@ -307,7 +306,7 @@ public class TabletInsertionDataContainer { } final PipeRowCollector rowCollector = new PipeRowCollector(); - for (int i = 0; i < timestampColumn.length; i++) { + for (int i = 0; i < rowCount; i++) { consumer.accept( new PipeRow( i, diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java index fe804b31412..4bf0e001d5d 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataContainer.java @@ -21,11 +21,11 @@ package org.apache.iotdb.db.pipe.core.event.view.datastructure; import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TsFileReader; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; -import org.apache.iotdb.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,100 +37,120 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; -public class TsFileInsertionDataContainer { +public class TsFileInsertionDataContainer implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionDataContainer.class); - private final File tsFile; private final String pattern; - private TimeseriesMetadata vectorTimeseriesMetadata; + private final TsFileSequenceReader tsFileSequenceReader; + private final TsFileReader tsFileReader; - private final Map<String, List<TimeseriesMetadata>> device2TimeseriesMetadataMap; + private final Iterator<Map.Entry<String, List<String>>> deviceMeasurementsMapIterator; + private final Map<String, TSDataType> measurementDataTypeMap; - public TsFileInsertionDataContainer(File tsFile, String pattern) { - this.tsFile = tsFile; + public TsFileInsertionDataContainer(File tsFile, String pattern) throws IOException { this.pattern = pattern; - this.device2TimeseriesMetadataMap = collectDevice2TimeseriesMetadataMap(); - } - - private Map<String, List<TimeseriesMetadata>> collectDevice2TimeseriesMetadataMap() { - final Map<String, List<TimeseriesMetadata>> result = new HashMap<>(); + tsFileSequenceReader = new TsFileSequenceReader(tsFile.getAbsolutePath()); + tsFileReader = new TsFileReader(tsFileSequenceReader); - try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getPath())) { - // match pattern - for (Map.Entry<String, List<TimeseriesMetadata>> entry : - reader.getAllTimeseriesMetadata(true).entrySet()) { - final String device = entry.getKey(); - boolean isVector = false; + final Map<String, List<String>> filteredDeviceMeasurementsMap = + filterDeviceMeasurementsMapByPattern(); + deviceMeasurementsMapIterator = filteredDeviceMeasurementsMap.entrySet().iterator(); + measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap(); + } - // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c - // in this case, all data can be matched without checking the measurements - if (pattern == null || pattern.length() <= device.length() && device.startsWith(pattern)) { - result.put(device, entry.getValue()); - } + private Map<String, List<String>> filterDeviceMeasurementsMapByPattern() throws IOException { + final Map<String, List<String>> filteredDeviceMeasurementsMap = new HashMap<>(); - // case 2: for example, pattern is root.a.b.c and device is root.a.b - // in this case, we need to check the full path - else { - final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); - - for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) { - // TODO: test me!!! - if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) { - vectorTimeseriesMetadata = timeseriesMetadata; - isVector = false; - continue; - } + for (Map.Entry<String, List<String>> entry : + tsFileSequenceReader.getDeviceMeasurementsMap().entrySet()) { + final String deviceId = entry.getKey(); - final String measurement = timeseriesMetadata.getMeasurementId(); - // low cost check comes first - if (pattern.length() == measurement.length() + device.length() + 1 - // high cost check comes later - && pattern.startsWith(device) - && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) { - if (!isVector) { - isVector = true; - timeseriesMetadataList.add(vectorTimeseriesMetadata); - } - timeseriesMetadataList.add(timeseriesMetadata); - } - } + // case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c + // in this case, all data can be matched without checking the measurements + if (pattern == null + || pattern.length() <= deviceId.length() && deviceId.startsWith(pattern)) { + filteredDeviceMeasurementsMap.put(deviceId, entry.getValue()); + } - if (!timeseriesMetadataList.isEmpty()) { - result.put(device, timeseriesMetadataList); + // case 2: for example, pattern is root.a.b.c and device is root.a.b + // in this case, we need to check the full path + else if (pattern.length() > deviceId.length() && pattern.startsWith(deviceId)) { + final List<String> filteredMeasurements = new ArrayList<>(); + + for (final String measurement : entry.getValue()) { + // low cost check comes first + if (pattern.length() == deviceId.length() + measurement.length() + 1 + // high cost check comes later + && pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) { + filteredMeasurements.add(measurement); } } + + filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements); } - } catch (IOException e) { - LOGGER.error("Cannot read TsFile {}.", tsFile.getPath(), e); } - return result; + return filteredDeviceMeasurementsMap; } + /** @return TabletInsertionEvent in a streaming way */ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() { return () -> new Iterator<TabletInsertionEvent>() { - private final Iterator<Tablet> tabletIterator = constructTabletIterable().iterator(); + private TsFileInsertionDataTabletIterator tabletIterator = null; @Override public boolean hasNext() { - return tabletIterator.hasNext(); + return (tabletIterator != null && tabletIterator.hasNext()) + || deviceMeasurementsMapIterator.hasNext(); } @Override public TabletInsertionEvent next() { - return new PipeRawTabletInsertionEvent(tabletIterator.next()); + if (!hasNext()) { + throw new NoSuchElementException(); + } + + if (tabletIterator == null || !tabletIterator.hasNext()) { + final Map.Entry<String, List<String>> entry = deviceMeasurementsMapIterator.next(); + try { + tabletIterator = + new TsFileInsertionDataTabletIterator( + tsFileReader, measurementDataTypeMap, entry.getKey(), entry.getValue()); + } catch (IOException e) { + throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); + } + } + + final TabletInsertionEvent next = + new PipeRawTabletInsertionEvent(tabletIterator.next()); + + if (!hasNext()) { + try { + close(); + } catch (Exception e) { + LOGGER.warn("Failed to close TsFileInsertionDataContainer", e); + } + } + + return next; } }; } - private Iterable<Tablet> constructTabletIterable() { - return () -> - new TsFileInsertionDataTabletIterator(tsFile.getPath(), device2TimeseriesMetadataMap); + @Override + public void close() throws Exception { + if (tsFileReader != null) { + tsFileReader.close(); + } + if (tsFileSequenceReader != null) { + tsFileSequenceReader.close(); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java index 7100e68b753..15959adb7ee 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/view/datastructure/TsFileInsertionDataTabletIterator.java @@ -20,101 +20,72 @@ package org.apache.iotdb.db.pipe.core.event.view.datastructure; import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.encoding.decoder.Decoder; -import org.apache.iotdb.tsfile.file.MetaMarker; -import org.apache.iotdb.tsfile.file.header.ChunkHeader; -import org.apache.iotdb.tsfile.file.header.PageHeader; -import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.TsFileSequenceReader; -import org.apache.iotdb.tsfile.read.common.BatchData; -import org.apache.iotdb.tsfile.read.reader.page.PageReader; -import org.apache.iotdb.tsfile.read.reader.page.TimePageReader; -import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader; -import org.apache.iotdb.tsfile.utils.BitMap; -import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.read.TsFileReader; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.stream.Collectors; public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> { - private static Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionDataTabletIterator.class); - private final TsFileSequenceReader reader; - private final String filePath; - private final Iterator<Map.Entry<String, List<TimeseriesMetadata>>> entriesIterator; - private Map.Entry<String, List<TimeseriesMetadata>> currentEntry; - private Iterator<TimeseriesMetadata> timeseriesMetadataIterator; - private TimeseriesMetadata currentTimeseriesMetadata; - private List<MeasurementSchema> measurementSchemas; + private final TsFileReader tsFileReader; + private final Map<String, TSDataType> measurementDataTypeMap; - private boolean isAligned; - private final List<long[]> timeBatches; - private long[] timestampsForAligned; + private final String deviceId; + private final List<String> measurements; - public TsFileInsertionDataTabletIterator( - String filePath, Map<String, List<TimeseriesMetadata>> device2TimeseriesMetadataMap) { - this.filePath = filePath; - this.entriesIterator = device2TimeseriesMetadataMap.entrySet().iterator(); - this.timeBatches = new ArrayList<>(); - this.currentEntry = null; - this.timeseriesMetadataIterator = null; - this.currentTimeseriesMetadata = null; - this.measurementSchemas = null; - this.isAligned = false; - this.timestampsForAligned = null; - try { - this.reader = new TsFileSequenceReader(filePath); - } catch (IOException e) { - throw new PipeException("Cannot create TsFileSequenceReader for file " + filePath, e); - } + private final QueryDataSet queryDataSet; - // Initialize timeseriesMetadataIterator if there is a next entry - if (entriesIterator.hasNext()) { - currentEntry = entriesIterator.next(); - timeseriesMetadataIterator = currentEntry.getValue().iterator(); - } else { - timeseriesMetadataIterator = - new Iterator<TimeseriesMetadata>() { - @Override - public boolean hasNext() { - return false; - } + public TsFileInsertionDataTabletIterator( + TsFileReader tsFileReader, + Map<String, TSDataType> measurementDataTypeMap, + String deviceId, + List<String> measurements) + throws IOException { + this.tsFileReader = tsFileReader; + this.measurementDataTypeMap = measurementDataTypeMap; + + this.deviceId = deviceId; + this.measurements = + measurements.stream() + .filter( + measurement -> + // time column in aligned time-series should not be a query column + measurement != null && !measurement.isEmpty()) + .sorted() + .collect(Collectors.toList()); + + this.queryDataSet = buildQueryDataSet(); + } - @Override - public TimeseriesMetadata next() { - return null; - } - }; + private QueryDataSet buildQueryDataSet() throws IOException { + final List<Path> paths = new ArrayList<>(); + for (String measurement : measurements) { + paths.add(new Path(deviceId, measurement, false)); } + return tsFileReader.query(QueryExpression.create(paths, null)); } @Override public boolean hasNext() { - boolean hasNext = timeseriesMetadataIterator.hasNext() || entriesIterator.hasNext(); - if (!hasNext) { - try { - reader.close(); - } catch (IOException e) { - LOGGER.warn("Cannot close TsFileSequenceReader for file {}", filePath, e); - } + try { + return queryDataSet.hasNext(); + } catch (IOException e) { + throw new PipeException("Failed to check next", e); } - return hasNext; } @Override @@ -123,165 +94,48 @@ public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> { throw new NoSuchElementException(); } - if (!timeseriesMetadataIterator.hasNext()) { - currentEntry = entriesIterator.next(); - timeseriesMetadataIterator = currentEntry.getValue().iterator(); - } - currentTimeseriesMetadata = timeseriesMetadataIterator.next(); - measurementSchemas = new ArrayList<>(); - - try (TsFileSequenceReader reader = new TsFileSequenceReader(filePath)) { - if (currentTimeseriesMetadata.getTSDataType() == TSDataType.VECTOR) { - processTimeseriesMetadata(currentTimeseriesMetadata, reader); - currentTimeseriesMetadata = timeseriesMetadataIterator.next(); - } - return processTimeseriesMetadata(currentTimeseriesMetadata, reader); + try { + return buildNextTablet(); } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private Tablet createTablet(long[] timestamps, Object[] values, BitMap[] bitMaps) { - long[] tmp; - - if (isAligned) { - if (timestampsForAligned == null) { - timestampsForAligned = timestamps; - return null; - } - tmp = timestampsForAligned; - } else { - tmp = timestamps; + throw new PipeException("Failed to build tablet", e); } - - // create tablet - int rowSize = tmp.length; - Tablet tablet = new Tablet(currentEntry.getKey(), measurementSchemas, rowSize); - tablet.timestamps = tmp; - tablet.values = values; - tablet.rowSize = rowSize; - tablet.bitMaps = bitMaps; - - return tablet; } - private Tablet processTimeseriesMetadata( - TimeseriesMetadata timeseriesMetadata, TsFileSequenceReader reader) { - int pageIndex = 0; - if (timeseriesMetadata.getTSDataType() == TSDataType.VECTOR) { - isAligned = true; - timeBatches.clear(); - } else { - MeasurementSchema measurementSchema = - new MeasurementSchema( - timeseriesMetadata.getMeasurementId(), timeseriesMetadata.getTSDataType()); - measurementSchemas.add(measurementSchema); + private Tablet buildNextTablet() throws IOException { + final List<MeasurementSchema> schemas = new ArrayList<>(); + for (final String measurement : measurements) { + final TSDataType dataType = + measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR + measurement); + schemas.add(new MeasurementSchema(measurement, dataType)); } + final Tablet tablet = new Tablet(deviceId, schemas); + tablet.initBitMaps(); - List<Byte> bitMapBytes = new ArrayList<>(); - List<Object> measurementValues = new ArrayList<>(); - List<Long> measurementTimestamps = new ArrayList<>(); - - for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { - long offset = chunkMetadata.getOffsetOfChunkHeader(); - try { - reader.position(offset); - ChunkHeader header = reader.readChunkHeader(reader.readMarker()); - int dataSize = header.getDataSize(); - - Decoder defaultTimeDecoder = - Decoder.getDecoderByType( - TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), - TSDataType.INT64); - Decoder valueDecoder = - Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); - pageIndex = 0; - if (header.getDataType() == TSDataType.VECTOR) { - timeBatches.clear(); - } - - while (dataSize > 0) { - PageHeader pageHeader = - reader.readPageHeader( - header.getDataType(), (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); - ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); + while (queryDataSet.hasNext()) { + final RowRecord rowRecord = queryDataSet.next(); - // Time column chunk - if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) - == TsFileConstant.TIME_COLUMN_MASK) { - TimePageReader timePageReader = - new TimePageReader(pageHeader, pageData, defaultTimeDecoder); - long[] timeBatch = timePageReader.getNextTimeBatch(); - timeBatches.add(timeBatch); + final int rowIndex = tablet.rowSize; - for (long time : timeBatch) { - measurementTimestamps.add(time); - } - } - // Value column chunk - else if ((header.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK) - == TsFileConstant.VALUE_COLUMN_MASK) { - ValuePageReader valuePageReader = - new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder); + tablet.addTimestamp(rowIndex, rowRecord.getTimestamp()); - for (byte value : valuePageReader.getBitmap()) { - bitMapBytes.add(value); - } - - for (TsPrimitiveType value : - valuePageReader.nextValueBatch(timeBatches.get(pageIndex))) { - measurementValues.add(value.getValue()); - } - } - - // NonAligned Chunk - else { - PageReader pageReader = - new PageReader( - pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null); - BatchData batchData = pageReader.getAllSatisfiedPageData(); - List<Integer> isNullList = new ArrayList<>(); - int index = 0; - while (batchData.hasCurrent()) { - measurementTimestamps.add(batchData.currentTime()); - Object value = batchData.currentValue(); - - if (value == null) { - isNullList.add(index); - } - measurementValues.add(value); - index++; - batchData.next(); - } - - BitMap bitmap = new BitMap(measurementTimestamps.size()); - for (int isNull : isNullList) { - bitmap.mark(isNull); - } - byte[] bytes = bitmap.getByteArray(); - for (byte value : bytes) { - bitMapBytes.add(value); - } - } - pageIndex++; - dataSize -= pageHeader.getSerializedPageSize(); + final List<Field> fields = rowRecord.getFields(); + final int fieldSize = fields.size(); + for (int i = 0; i < fieldSize; i++) { + final Field field = fields.get(i); + if (field == null || field.getDataType() == null) { + tablet.bitMaps[i].mark(rowIndex); + } else { + tablet.addValue(measurements.get(i), rowIndex, field.getObjectValue(field.getDataType())); } - } catch (IOException e) { - throw new UncheckedIOException(e); } - } - long[] timestamps = new long[measurementTimestamps.size()]; - for (int i = 0; i < measurementTimestamps.size(); i++) { - timestamps[i] = measurementTimestamps.get(i); - } + tablet.rowSize++; - byte[] byteArray = new byte[bitMapBytes.size()]; - for (int i = 0; i < bitMapBytes.size(); i++) { - byteArray[i] = bitMapBytes.get(i); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + break; + } } - BitMap[] bitMaps = new BitMap[] {new BitMap(byteArray.length, byteArray)}; - return createTablet(timestamps, measurementValues.toArray(), bitMaps); + return tablet; } } diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java new file mode 100644 index 00000000000..b1d3ecec461 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/pipe/core/event/TsFileInsertionDataContainerTest.java @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.core.event; + +import org.apache.iotdb.db.pipe.core.event.impl.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.core.event.view.datastructure.TsFileInsertionDataContainer; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class TsFileInsertionDataContainerTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(TsFileInsertionDataContainerTest.class); + + private File alignedTsFile; + private File nonalignedTsFile; + + @After + public void tearDown() throws Exception { + if (alignedTsFile != null) { + alignedTsFile.delete(); + } + if (nonalignedTsFile != null) { + nonalignedTsFile.delete(); + } + } + + @Test + public void testToTabletInsertionEvents() throws Exception { + Set<Integer> deviceNumbers = new HashSet<>(); + deviceNumbers.add(1); + deviceNumbers.add(2); + + Set<Integer> measurementNumbers = new HashSet<>(); + measurementNumbers.add(1); + measurementNumbers.add(2); + + for (int deviceNumber : deviceNumbers) { + for (int measurementNumber : measurementNumbers) { + testToTabletInsertionEvents(deviceNumber, measurementNumber, 0); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 999); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 999 * 2 + 1); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1000); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1001 * 2 - 1); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1023 * 2 + 1); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1024 * 2); + testToTabletInsertionEvents(deviceNumber, measurementNumber, 1025 * 2 - 1); + + testToTabletInsertionEvents(deviceNumber, measurementNumber, 10001); + } + } + } + + private void testToTabletInsertionEvents( + int deviceNumber, int measurementNumber, int rowNumberInOneDevice) throws Exception { + LOGGER.info( + "testToTabletInsertionEvents: deviceNumber = {}, measurementNumber = {}, rowNumberInOneDevice = {}", + deviceNumber, + measurementNumber, + rowNumberInOneDevice); + + alignedTsFile = + TsFileGeneratorUtils.generateAlignedTsFile( + "aligned.tsfile", + deviceNumber, + measurementNumber, + rowNumberInOneDevice, + 300, + 10000, + 700, + 50); + nonalignedTsFile = + TsFileGeneratorUtils.generateNonAlignedTsFile( + "nonaligned.tsfile", + deviceNumber, + measurementNumber, + rowNumberInOneDevice, + 300, + 10000, + 700, + 50); + + try (final TsFileInsertionDataContainer alignedContainer = + new TsFileInsertionDataContainer(alignedTsFile, "root"); + final TsFileInsertionDataContainer nonalignedContainer = + new TsFileInsertionDataContainer(nonalignedTsFile, "root"); ) { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + + alignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals( + measurementNumber, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + })))); + + Assert.assertEquals(count1.getAndSet(0), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count2.getAndSet(0), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count3.getAndSet(0), deviceNumber * rowNumberInOneDevice); + + nonalignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals( + measurementNumber, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })))); + + Assert.assertEquals(count1.get(), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count2.get(), deviceNumber * rowNumberInOneDevice); + Assert.assertEquals(count3.get(), deviceNumber * rowNumberInOneDevice); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + AtomicReference<String> oneDeviceInAlignedTsFile = new AtomicReference<>(); + AtomicReference<String> oneMeasurementInAlignedTsFile = new AtomicReference<>(); + + AtomicReference<String> oneDeviceInUnalignedTsFile = new AtomicReference<>(); + AtomicReference<String> oneMeasurementInUnalignedTsFile = new AtomicReference<>(); + + try (TsFileSequenceReader alignedReader = + new TsFileSequenceReader(alignedTsFile.getAbsolutePath()); + TsFileSequenceReader nonalignedReader = + new TsFileSequenceReader(nonalignedTsFile.getAbsolutePath())) { + + alignedReader + .getDeviceMeasurementsMap() + .forEach( + (k, v) -> + v.stream() + .filter(p -> p != null && !p.isEmpty()) + .forEach( + p -> { + oneDeviceInAlignedTsFile.set(k); + oneMeasurementInAlignedTsFile.set(new Path(k, p, false).toString()); + })); + nonalignedReader + .getDeviceMeasurementsMap() + .forEach( + (k, v) -> + v.stream() + .filter(p -> p != null && !p.isEmpty()) + .forEach( + p -> { + oneDeviceInUnalignedTsFile.set(k); + oneMeasurementInUnalignedTsFile.set(new Path(k, p, false).toString()); + })); + } + + try (final TsFileInsertionDataContainer alignedContainer = + new TsFileInsertionDataContainer(alignedTsFile, oneDeviceInAlignedTsFile.get()); + final TsFileInsertionDataContainer nonalignedContainer = + new TsFileInsertionDataContainer( + nonalignedTsFile, oneDeviceInUnalignedTsFile.get()); ) { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + + alignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals( + measurementNumber, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + })))); + + Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice); + + nonalignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(measurementNumber, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals( + measurementNumber, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })))); + + Assert.assertEquals(count1.get(), rowNumberInOneDevice); + Assert.assertEquals(count2.get(), rowNumberInOneDevice); + Assert.assertEquals(count3.get(), rowNumberInOneDevice); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + try (final TsFileInsertionDataContainer alignedContainer = + new TsFileInsertionDataContainer(alignedTsFile, oneMeasurementInAlignedTsFile.get()); + final TsFileInsertionDataContainer nonalignedContainer = + new TsFileInsertionDataContainer( + nonalignedTsFile, oneMeasurementInUnalignedTsFile.get()); ) { + AtomicInteger count1 = new AtomicInteger(0); + AtomicInteger count2 = new AtomicInteger(0); + AtomicInteger count3 = new AtomicInteger(0); + + alignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(1, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(1, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(1, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + })))); + + Assert.assertEquals(count1.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count2.getAndSet(0), rowNumberInOneDevice); + Assert.assertEquals(count3.getAndSet(0), rowNumberInOneDevice); + + nonalignedContainer + .toTabletInsertionEvents() + .forEach( + event -> + event + .processTablet( + (tablet, rowCollector) -> { + new PipeRawTabletInsertionEvent(tablet) + .processRowByRow( + (row, collector) -> { + try { + rowCollector.collectRow(row); + Assert.assertEquals(1, row.size()); + count1.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }) + .forEach( + tabletInsertionEvent1 -> + tabletInsertionEvent1 + .processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(1, row.size()); + count2.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + .forEach( + tabletInsertionEvent2 -> + tabletInsertionEvent2.processRowByRow( + (row, collector) -> { + try { + collector.collectRow(row); + Assert.assertEquals(1, row.size()); + count3.incrementAndGet(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })))); + + Assert.assertEquals(count1.get(), rowNumberInOneDevice); + Assert.assertEquals(count2.get(), rowNumberInOneDevice); + Assert.assertEquals(count3.get(), rowNumberInOneDevice); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 6c29e7c4c44..8e760edaef4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -2049,6 +2049,24 @@ public class TsFileSequenceReader implements AutoCloseable { return result; } + /** + * get all types of measurements in this file + * + * @return full path -> datatype + */ + public Map<String, TSDataType> getFullPathDataTypeMap() throws IOException { + final Map<String, TSDataType> result = new HashMap<>(); + for (final String device : getAllDevices()) { + Map<String, TimeseriesMetadata> timeseriesMetadataMap = readDeviceMetadata(device); + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataMap.values()) { + result.put( + device + TsFileConstant.PATH_SEPARATOR + timeseriesMetadata.getMeasurementId(), + timeseriesMetadata.getTSDataType()); + } + } + return result; + } + public Map<String, List<String>> getDeviceMeasurementsMap() throws IOException { Map<String, List<String>> result = new HashMap<>(); for (String device : getAllDevices()) {
