This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch cp_upgrade_mem in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4608139a7f9a485412a85c89b114c74a76e57f37 Author: Haonan <[email protected]> AuthorDate: Sun Aug 8 16:06:07 2021 +0800 [IOTDB-1546] Optimize the Upgrade/Rewrite Tool rewrite logic to reduce the temp memory cost --- .../apache/iotdb/db/tools/TsFileRewriteTool.java | 148 ++++++++------------ .../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 153 ++++++++++++--------- 2 files changed, 151 insertions(+), 150 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java index c52457b..d5b5934 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.tools; import org.apache.iotdb.db.engine.StorageEngine; @@ -44,7 +45,6 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; -import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -156,18 +156,22 @@ public class TsFileRewriteTool implements AutoCloseable { if (!fileCheck()) { return; } - int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; + long headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; reader.position(headerLength); // start to scan chunks and chunkGroups - List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>(); - List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>(); - List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>(); byte marker; - List<IMeasurementSchema> measurementSchemaList = new ArrayList<>(); - String lastChunkGroupDeviceId = null; + + String deviceId = null; + boolean firstChunkInChunkGroup = true; try { while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { + case MetaMarker.CHUNK_GROUP_HEADER: + ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); + deviceId = chunkGroupHeader.getDeviceID(); + firstChunkInChunkGroup = true; + endChunkGroup(); + break; case MetaMarker.CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: ChunkHeader header = reader.readChunkHeader(marker); @@ -177,7 +181,6 @@ public class TsFileRewriteTool implements AutoCloseable { header.getDataType(), header.getEncodingType(), header.getCompressionType()); - measurementSchemaList.add(measurementSchema); TSDataType dataType = header.getDataType(); TSEncoding encoding = header.getEncodingType(); List<PageHeader> pageHeadersInChunk = new ArrayList<>(); @@ -195,26 +198,14 @@ public class TsFileRewriteTool implements AutoCloseable { dataInChunk.add(pageData); dataSize -= pageHeader.getSerializedPageSize(); } - pageHeadersInChunkGroup.add(pageHeadersInChunk); - pageDataInChunkGroup.add(dataInChunk); - needToDecodeInfoInChunkGroup.add(needToDecodeInfo); - break; - case MetaMarker.CHUNK_GROUP_HEADER: - ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); - String deviceId = chunkGroupHeader.getDeviceID(); - if (lastChunkGroupDeviceId != null && !measurementSchemaList.isEmpty()) { - rewrite( - lastChunkGroupDeviceId, - measurementSchemaList, - pageHeadersInChunkGroup, - pageDataInChunkGroup, - needToDecodeInfoInChunkGroup); - pageHeadersInChunkGroup.clear(); - pageDataInChunkGroup.clear(); - measurementSchemaList.clear(); - needToDecodeInfoInChunkGroup.clear(); - } - lastChunkGroupDeviceId = deviceId; + reWriteChunk( + deviceId, + firstChunkInChunkGroup, + measurementSchema, + pageHeadersInChunk, + dataInChunk, + needToDecodeInfo); + firstChunkInChunkGroup = false; break; case MetaMarker.OPERATION_INDEX_RANGE: reader.readPlanIndex(); @@ -239,19 +230,7 @@ public class TsFileRewriteTool implements AutoCloseable { MetaMarker.handleUnexpectedMarker(marker); } } - - if (!measurementSchemaList.isEmpty()) { - rewrite( - lastChunkGroupDeviceId, - measurementSchemaList, - pageHeadersInChunkGroup, - pageDataInChunkGroup, - needToDecodeInfoInChunkGroup); - pageHeadersInChunkGroup.clear(); - pageDataInChunkGroup.clear(); - measurementSchemaList.clear(); - needToDecodeInfoInChunkGroup.clear(); - } + endChunkGroup(); // close upgraded tsFiles and generate resources for them for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) { rewrittenResources.add(endFileAndGenerateResource(tsFileIOWriter)); @@ -302,43 +281,42 @@ public class TsFileRewriteTool implements AutoCloseable { } /** - * This method is for rewriting the ChunkGroup which data is in the different time partitions. In - * this case, we have to decode the data to points, and then rewrite the data points to different + * This method is for rewriting the Chunk which data is in the different time partitions. In this + * case, we have to decode the data to points, and then rewrite the data points to different * chunkWriters, finally write chunks to their own upgraded TsFiles. */ - protected void rewrite( + protected void reWriteChunk( String deviceId, - List<IMeasurementSchema> schemas, - List<List<PageHeader>> pageHeadersInChunkGroup, - List<List<ByteBuffer>> dataInChunkGroup, - List<List<Boolean>> needToDecodeInfoInChunkGroup) + boolean firstChunkInChunkGroup, + MeasurementSchema schema, + List<PageHeader> pageHeadersInChunk, + List<ByteBuffer> pageDataInChunk, + List<Boolean> needToDecodeInfoInChunk) throws IOException, PageException { - Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>(); - for (int i = 0; i < schemas.size(); i++) { - IMeasurementSchema schema = schemas.get(i); - List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i); - List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i); - List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i); - valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType()); - for (int j = 0; j < pageDataInChunk.size(); j++) { - if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) { - decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup); - } else { - writePageInToFile( - schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup); - } + valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType()); + Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>(); + for (int i = 0; i < pageDataInChunk.size(); i++) { + if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) { + decodeAndWritePage(schema, pageDataInChunk.get(i), partitionChunkWriterMap); + } else { + writePage( + schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap); } } - - for (Entry<Long, Map<IMeasurementSchema, ChunkWriterImpl>> entry : - chunkWritersInChunkGroup.entrySet()) { + for (Entry<Long, ChunkWriterImpl> entry : partitionChunkWriterMap.entrySet()) { long partitionId = entry.getKey(); TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId); - tsFileIOWriter.startChunkGroup(deviceId); - // write chunks to their own upgraded tsFiles - for (IChunkWriter chunkWriter : entry.getValue().values()) { - chunkWriter.writeToFileWriter(tsFileIOWriter); + if (firstChunkInChunkGroup) { + tsFileIOWriter.startChunkGroup(deviceId); } + // write chunks to their own upgraded tsFiles + IChunkWriter chunkWriter = entry.getValue(); + chunkWriter.writeToFileWriter(tsFileIOWriter); + } + } + + protected void endChunkGroup() throws IOException, PageException { + for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) { tsFileIOWriter.endChunkGroup(); } } @@ -381,46 +359,42 @@ public class TsFileRewriteTool implements AutoCloseable { }); } - protected void writePageInToFile( - IMeasurementSchema schema, + protected void writePage( + MeasurementSchema schema, PageHeader pageHeader, ByteBuffer pageData, - Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) + Map<Long, ChunkWriterImpl> partitionChunkWriterMap) throws PageException { long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime()); getOrDefaultTsFileIOWriter(oldTsFile, partitionId); - Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters = - chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>()); - ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema)); + ChunkWriterImpl chunkWriter = + partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema)); chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader); - chunkWriters.put(schema, chunkWriter); - chunkWritersInChunkGroup.put(partitionId, chunkWriters); } - protected void decodeAndWritePageInToFiles( - IMeasurementSchema schema, + protected void decodeAndWritePage( + MeasurementSchema schema, ByteBuffer pageData, - Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) + Map<Long, ChunkWriterImpl> partitionChunkWriterMap) throws IOException { valueDecoder.reset(); PageReader pageReader = new PageReader(pageData, schema.getType(), valueDecoder, defaultTimeDecoder, null); BatchData batchData = pageReader.getAllSatisfiedPageData(); - rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup); + rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap); } protected void rewritePageIntoFiles( BatchData batchData, - IMeasurementSchema schema, - Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) { + MeasurementSchema schema, + Map<Long, ChunkWriterImpl> partitionChunkWriterMap) { while (batchData.hasCurrent()) { long time = batchData.currentTime(); Object value = batchData.currentValue(); long partitionId = StorageEngine.getTimePartition(time); - Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters = - chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>()); - ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema)); + ChunkWriterImpl chunkWriter = + partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema)); getOrDefaultTsFileIOWriter(oldTsFile, partitionId); switch (schema.getType()) { case INT32: @@ -446,8 +420,6 @@ public class TsFileRewriteTool implements AutoCloseable { String.format("Data type %s is not supported.", schema.getType())); } batchData.next(); - chunkWriters.put(schema, chunkWriter); - chunkWritersInChunkGroup.put(partitionId, chunkWriters); } } diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java index 8b04bba..d3cb82d 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java @@ -36,7 +36,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2; import org.apache.iotdb.tsfile.v2.read.reader.page.PageReaderV2; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; -import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -89,74 +88,103 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { return; } - int headerLength = + long headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER_V2.getBytes().length; reader.position(headerLength); - List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>(); - List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>(); - List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>(); byte marker; - List<IMeasurementSchema> measurementSchemaList = new ArrayList<>(); + long firstChunkPositionInChunkGroup = headerLength; + boolean firstChunkInChunkGroup = true; + String deviceId = null; + boolean skipReadingChunk = true; try { while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { case MetaMarker.CHUNK_HEADER: - ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader(); - IMeasurementSchema measurementSchema = - new MeasurementSchema( - header.getMeasurementID(), - header.getDataType(), - header.getEncodingType(), - header.getCompressionType()); - measurementSchemaList.add(measurementSchema); - TSDataType dataType = header.getDataType(); - TSEncoding encoding = header.getEncodingType(); - List<PageHeader> pageHeadersInChunk = new ArrayList<>(); - List<ByteBuffer> dataInChunk = new ArrayList<>(); - List<Boolean> needToDecodeInfo = new ArrayList<>(); - int dataSize = header.getDataSize(); - while (dataSize > 0) { - // a new Page - PageHeader pageHeader = ((TsFileSequenceReaderForV2) reader).readPageHeader(dataType); - boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader); - needToDecodeInfo.add(needToDecode); - ByteBuffer pageData = - !needToDecode - ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader) - : reader.readPage(pageHeader, header.getCompressionType()); - pageHeadersInChunk.add(pageHeader); - dataInChunk.add(pageData); - dataSize -= - (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize - // count, startTime, endTime bytes size in old statistics - + 24 - // statistics bytes size - // new boolean StatsSize is 8 bytes larger than old one - + (pageHeader.getStatistics().getStatsSize() - - (dataType == TSDataType.BOOLEAN ? 8 : 0)) - // page data bytes - + pageHeader.getCompressedSize()); + if (skipReadingChunk || deviceId == null) { + ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader(); + int dataSize = header.getDataSize(); + while (dataSize > 0) { + // a new Page + PageHeader pageHeader = + ((TsFileSequenceReaderForV2) reader).readPageHeader(header.getDataType()); + ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader); + dataSize -= + (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize + // count, startTime, endTime bytes size in old statistics + + 24 + // statistics bytes size + // new boolean StatsSize is 8 bytes larger than old one + + (pageHeader.getStatistics().getStatsSize() + - (header.getDataType() == TSDataType.BOOLEAN ? 8 : 0)) + // page data bytes + + pageHeader.getCompressedSize()); + } + } else { + ChunkHeader header = ((TsFileSequenceReaderForV2) reader).readChunkHeader(); + MeasurementSchema measurementSchema = + new MeasurementSchema( + header.getMeasurementID(), + header.getDataType(), + header.getEncodingType(), + header.getCompressionType()); + TSDataType dataType = header.getDataType(); + TSEncoding encoding = header.getEncodingType(); + List<PageHeader> pageHeadersInChunk = new ArrayList<>(); + List<ByteBuffer> dataInChunk = new ArrayList<>(); + List<Boolean> needToDecodeInfo = new ArrayList<>(); + int dataSize = header.getDataSize(); + while (dataSize > 0) { + // a new Page + PageHeader pageHeader = + ((TsFileSequenceReaderForV2) reader).readPageHeader(dataType); + boolean needToDecode = checkIfNeedToDecode(dataType, encoding, pageHeader); + needToDecodeInfo.add(needToDecode); + ByteBuffer pageData = + !needToDecode + ? ((TsFileSequenceReaderForV2) reader).readCompressedPage(pageHeader) + : reader.readPage(pageHeader, header.getCompressionType()); + pageHeadersInChunk.add(pageHeader); + dataInChunk.add(pageData); + dataSize -= + (Integer.BYTES * 2 // the bytes size of uncompressedSize and compressedSize + // count, startTime, endTime bytes size in old statistics + + 24 + // statistics bytes size + // new boolean StatsSize is 8 bytes larger than old one + + (pageHeader.getStatistics().getStatsSize() + - (dataType == TSDataType.BOOLEAN ? 8 : 0)) + // page data bytes + + pageHeader.getCompressedSize()); + } + reWriteChunk( + deviceId, + firstChunkInChunkGroup, + measurementSchema, + pageHeadersInChunk, + dataInChunk, + needToDecodeInfo); + if (firstChunkInChunkGroup) { + firstChunkInChunkGroup = false; + } } - pageHeadersInChunkGroup.add(pageHeadersInChunk); - pageDataInChunkGroup.add(dataInChunk); - needToDecodeInfoInChunkGroup.add(needToDecodeInfo); break; case MetaMarker.CHUNK_GROUP_HEADER: // this is the footer of a ChunkGroup in TsFileV2. - ChunkGroupHeader chunkGroupFooter = - ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter(); - String deviceID = chunkGroupFooter.getDeviceID(); - rewrite( - deviceID, - measurementSchemaList, - pageHeadersInChunkGroup, - pageDataInChunkGroup, - needToDecodeInfoInChunkGroup); - pageHeadersInChunkGroup.clear(); - pageDataInChunkGroup.clear(); - measurementSchemaList.clear(); - needToDecodeInfoInChunkGroup.clear(); + if (skipReadingChunk) { + skipReadingChunk = false; + ChunkGroupHeader chunkGroupFooter = + ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter(); + deviceId = chunkGroupFooter.getDeviceID(); + reader.position(firstChunkPositionInChunkGroup); + } else { + endChunkGroup(); + skipReadingChunk = true; + ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter(); + deviceId = null; + firstChunkPositionInChunkGroup = reader.position(); + firstChunkInChunkGroup = true; + } break; case MetaMarker.VERSION: long version = ((TsFileSequenceReaderForV2) reader).readVersion(); @@ -184,6 +212,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) { tsFileIOWriter.writePlanIndices(); } + firstChunkPositionInChunkGroup = reader.position(); break; default: // the disk file is corrupted, using this file may be dangerous @@ -213,7 +242,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { currentMod = null; } } - } catch (IOException e2) { + } catch (Exception e2) { throw new IOException( "TsFile upgrade process cannot proceed at position " + reader.position() @@ -249,16 +278,16 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { } @Override - protected void decodeAndWritePageInToFiles( - IMeasurementSchema schema, + protected void decodeAndWritePage( + MeasurementSchema schema, ByteBuffer pageData, - Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) + Map<Long, ChunkWriterImpl> partitionChunkWriterMap) throws IOException { valueDecoder.reset(); PageReaderV2 pageReader = new PageReaderV2(pageData, schema.getType(), valueDecoder, defaultTimeDecoder, null); BatchData batchData = pageReader.getAllSatisfiedPageData(); - rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup); + rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap); } /** check if the file to be upgraded has correct magic strings and version number */
