This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch upgradeMem in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c5b2ab090c76e588d3f945256357c50735e82543 Author: HTHou <[email protected]> AuthorDate: Tue Aug 3 11:29:56 2021 +0800 Optimize the Upgrade Tool rewrite logic --- .../apache/iotdb/db/tools/TsFileRewriteTool.java | 143 +++++++++++---------- .../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 32 ++--- 2 files changed, 89 insertions(+), 86 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java index e1e4bc7..54e1560 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java @@ -1,20 +1,16 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ package org.apache.iotdb.db.tools; @@ -158,11 +154,9 @@ public class TsFileRewriteTool implements AutoCloseable { int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; reader.position(headerLength); // start to scan chunks and chunkGroups - List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>(); - List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>(); - List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>(); byte marker; - List<MeasurementSchema> measurementSchemaList = new ArrayList<>(); + + Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>(); String lastChunkGroupDeviceId = null; try { while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { @@ -176,7 +170,6 @@ public class TsFileRewriteTool implements AutoCloseable { header.getDataType(), header.getEncodingType(), header.getCompressionType()); - measurementSchemaList.add(measurementSchema); TSDataType dataType = header.getDataType(); TSEncoding encoding = header.getEncodingType(); List<PageHeader> pageHeadersInChunk = new ArrayList<>(); @@ -194,24 +187,19 @@ public class TsFileRewriteTool implements AutoCloseable { dataInChunk.add(pageData); dataSize -= pageHeader.getSerializedPageSize(); } - pageHeadersInChunkGroup.add(pageHeadersInChunk); - pageDataInChunkGroup.add(dataInChunk); - needToDecodeInfoInChunkGroup.add(needToDecodeInfo); + reEncodeChunk( + measurementSchema, + pageHeadersInChunk, + dataInChunk, + needToDecodeInfo, + chunkWritersInChunkGroup); break; case MetaMarker.CHUNK_GROUP_HEADER: ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); String deviceId = chunkGroupHeader.getDeviceID(); - if (lastChunkGroupDeviceId != null && !measurementSchemaList.isEmpty()) { - rewrite( - lastChunkGroupDeviceId, - measurementSchemaList, - pageHeadersInChunkGroup, - pageDataInChunkGroup, - needToDecodeInfoInChunkGroup); - pageHeadersInChunkGroup.clear(); - pageDataInChunkGroup.clear(); - measurementSchemaList.clear(); - needToDecodeInfoInChunkGroup.clear(); + if (lastChunkGroupDeviceId != null && !chunkWritersInChunkGroup.isEmpty()) { + reWriteChunkGroupToFile(lastChunkGroupDeviceId, chunkWritersInChunkGroup); + chunkWritersInChunkGroup.clear(); } lastChunkGroupDeviceId = deviceId; break; @@ -239,17 +227,9 @@ public class TsFileRewriteTool implements AutoCloseable { } } - if (!measurementSchemaList.isEmpty()) { - rewrite( - lastChunkGroupDeviceId, - measurementSchemaList, - pageHeadersInChunkGroup, - pageDataInChunkGroup, - needToDecodeInfoInChunkGroup); - pageHeadersInChunkGroup.clear(); - pageDataInChunkGroup.clear(); - measurementSchemaList.clear(); - needToDecodeInfoInChunkGroup.clear(); + if (!chunkWritersInChunkGroup.isEmpty()) { + reWriteChunkGroupToFile(lastChunkGroupDeviceId, chunkWritersInChunkGroup); + chunkWritersInChunkGroup.clear(); } // close upgraded tsFiles and generate resources for them for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) { @@ -305,30 +285,61 @@ public class TsFileRewriteTool implements AutoCloseable { * this case, we have to decode the data to points, and then rewrite the data points to different * chunkWriters, finally write chunks to their own upgraded TsFiles. */ - protected void rewrite( - String deviceId, - List<MeasurementSchema> schemas, - List<List<PageHeader>> pageHeadersInChunkGroup, - List<List<ByteBuffer>> dataInChunkGroup, - List<List<Boolean>> needToDecodeInfoInChunkGroup) + // protected void rewrite(String deviceId, List<MeasurementSchema> schemas, + // List<List<PageHeader>> pageHeadersInChunkGroup, List<List<ByteBuffer>> dataInChunkGroup, + // List<List<Boolean>> needToDecodeInfoInChunkGroup) throws IOException, PageException { + // Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new + // HashMap<>(); + // for (int i = 0; i < schemas.size(); i++) { + // MeasurementSchema schema = schemas.get(i); + // List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i); + // List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i); + // List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i); + // valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType()); + // for (int j = 0; j < pageDataInChunk.size(); j++) { + // if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) { + // decodeAndWritePage(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup); + // } else { + // writePage(schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), + // chunkWritersInChunkGroup); + // } + // } + // } + // + // for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup + // .entrySet()) { + // long partitionId = entry.getKey(); + // TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId); + // tsFileIOWriter.startChunkGroup(deviceId); + // // write chunks to their own upgraded tsFiles + // for (IChunkWriter chunkWriter : entry.getValue().values()) { + // chunkWriter.writeToFileWriter(tsFileIOWriter); + // } + // tsFileIOWriter.endChunkGroup(); + // } + // } + + protected void reEncodeChunk( + MeasurementSchema schema, + List<PageHeader> pageHeadersInChunk, + List<ByteBuffer> pageDataInChunk, + List<Boolean> needToDecodeInfoInChunk, + Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) throws IOException, PageException { - Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>(); - for (int i = 0; i < schemas.size(); i++) { - MeasurementSchema schema = schemas.get(i); - List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i); - List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i); - List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i); - valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType()); - for (int j = 0; j < pageDataInChunk.size(); j++) { - if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) { - decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup); - } else { - writePageInToFile( - schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup); - } + valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType()); + for (int i = 0; i < pageDataInChunk.size(); i++) { + if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) { + decodeAndWritePage(schema, pageDataInChunk.get(i), chunkWritersInChunkGroup); + } else { + writePage( + schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), chunkWritersInChunkGroup); } } + } + protected void reWriteChunkGroupToFile( + String deviceId, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) + throws IOException { for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry : chunkWritersInChunkGroup.entrySet()) { long partitionId = entry.getKey(); @@ -380,7 +391,7 @@ public class TsFileRewriteTool implements AutoCloseable { }); } - protected void writePageInToFile( + protected void writePage( MeasurementSchema schema, PageHeader pageHeader, ByteBuffer pageData, @@ -396,7 +407,7 @@ public class TsFileRewriteTool implements AutoCloseable { chunkWritersInChunkGroup.put(partitionId, chunkWriters); } - protected void decodeAndWritePageInToFiles( + protected void decodeAndWritePage( MeasurementSchema schema, ByteBuffer pageData, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) diff --git a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java index 90646b5..1c48488 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java @@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -96,11 +97,8 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { TSFileConfig.MAGIC_STRING.getBytes().length + TSFileConfig.VERSION_NUMBER_V2.getBytes().length; reader.position(headerLength); - List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>(); - List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>(); - List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>(); byte marker; - List<MeasurementSchema> measurementSchemaList = new ArrayList<>(); + Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>(); try { while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { @@ -112,7 +110,6 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { header.getDataType(), header.getEncodingType(), header.getCompressionType()); - measurementSchemaList.add(measurementSchema); TSDataType dataType = header.getDataType(); TSEncoding encoding = header.getEncodingType(); List<PageHeader> pageHeadersInChunk = new ArrayList<>(); @@ -141,25 +138,20 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { // page data bytes + pageHeader.getCompressedSize()); } - pageHeadersInChunkGroup.add(pageHeadersInChunk); - pageDataInChunkGroup.add(dataInChunk); - needToDecodeInfoInChunkGroup.add(needToDecodeInfo); + reEncodeChunk( + measurementSchema, + pageHeadersInChunk, + dataInChunk, + needToDecodeInfo, + chunkWritersInChunkGroup); break; case MetaMarker.CHUNK_GROUP_HEADER: // this is the footer of a ChunkGroup in TsFileV2. ChunkGroupHeader chunkGroupFooter = ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter(); String deviceID = chunkGroupFooter.getDeviceID(); - rewrite( - deviceID, - measurementSchemaList, - pageHeadersInChunkGroup, - pageDataInChunkGroup, - needToDecodeInfoInChunkGroup); - pageHeadersInChunkGroup.clear(); - pageDataInChunkGroup.clear(); - measurementSchemaList.clear(); - needToDecodeInfoInChunkGroup.clear(); + reWriteChunkGroupToFile(deviceID, chunkWritersInChunkGroup); + chunkWritersInChunkGroup.clear(); break; case MetaMarker.VERSION: long version = ((TsFileSequenceReaderForV2) reader).readVersion(); @@ -216,7 +208,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { currentMod = null; } } - } catch (IOException e2) { + } catch (Exception e2) { throw new IOException( "TsFile upgrade process cannot proceed at position " + reader.position() @@ -252,7 +244,7 @@ public class TsFileOnlineUpgradeTool extends TsFileRewriteTool { } @Override - protected void decodeAndWritePageInToFiles( + protected void decodeAndWritePage( MeasurementSchema schema, ByteBuffer pageData, Map<Long, Map<MeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
