This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 75077fb [IOTDB-1546] [To rel/0.12] Optimize the Upgrade Tool rewrite
logic to reduce the temp memory cost (#3670)
75077fb is described below
commit 75077fbfa62307e9cc0acdcd213d7b3c66f8ff27
Author: Haonan <[email protected]>
AuthorDate: Sun Aug 8 16:06:07 2021 +0800
[IOTDB-1546] [To rel/0.12] Optimize the Upgrade Tool rewrite logic to
reduce the temp memory cost (#3670)
---
.../apache/iotdb/db/tools/TsFileRewriteTool.java | 141 ++++++++-----------
.../db/tools/upgrade/TsFileOnlineUpgradeTool.java | 150 ++++++++++++---------
2 files changed, 147 insertions(+), 144 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index e1e4bc7..84b075f 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.tools;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -155,18 +156,22 @@ public class TsFileRewriteTool implements AutoCloseable {
if (!fileCheck()) {
return;
}
- int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length +
Byte.BYTES;
+ long headerLength = TSFileConfig.MAGIC_STRING.getBytes().length +
Byte.BYTES;
reader.position(headerLength);
// start to scan chunks and chunkGroups
- List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
- List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
- List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
byte marker;
- List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
- String lastChunkGroupDeviceId = null;
+
+ String deviceId = null;
+ boolean firstChunkInChunkGroup = true;
try {
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+ deviceId = chunkGroupHeader.getDeviceID();
+ firstChunkInChunkGroup = true;
+ endChunkGroup();
+ break;
case MetaMarker.CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
ChunkHeader header = reader.readChunkHeader(marker);
@@ -176,7 +181,6 @@ public class TsFileRewriteTool implements AutoCloseable {
header.getDataType(),
header.getEncodingType(),
header.getCompressionType());
- measurementSchemaList.add(measurementSchema);
TSDataType dataType = header.getDataType();
TSEncoding encoding = header.getEncodingType();
List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -194,26 +198,14 @@ public class TsFileRewriteTool implements AutoCloseable {
dataInChunk.add(pageData);
dataSize -= pageHeader.getSerializedPageSize();
}
- pageHeadersInChunkGroup.add(pageHeadersInChunk);
- pageDataInChunkGroup.add(dataInChunk);
- needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
- break;
- case MetaMarker.CHUNK_GROUP_HEADER:
- ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
- String deviceId = chunkGroupHeader.getDeviceID();
- if (lastChunkGroupDeviceId != null &&
!measurementSchemaList.isEmpty()) {
- rewrite(
- lastChunkGroupDeviceId,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
- }
- lastChunkGroupDeviceId = deviceId;
+ reWriteChunk(
+ deviceId,
+ firstChunkInChunkGroup,
+ measurementSchema,
+ pageHeadersInChunk,
+ dataInChunk,
+ needToDecodeInfo);
+ firstChunkInChunkGroup = false;
break;
case MetaMarker.OPERATION_INDEX_RANGE:
reader.readPlanIndex();
@@ -238,19 +230,7 @@ public class TsFileRewriteTool implements AutoCloseable {
MetaMarker.handleUnexpectedMarker(marker);
}
}
-
- if (!measurementSchemaList.isEmpty()) {
- rewrite(
- lastChunkGroupDeviceId,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
- }
+ endChunkGroup();
// close upgraded tsFiles and generate resources for them
for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
rewrittenResources.add(endFileAndGenerateResource(tsFileIOWriter));
@@ -301,43 +281,42 @@ public class TsFileRewriteTool implements AutoCloseable {
}
/**
- * This method is for rewriting the ChunkGroup which data is in the
different time partitions. In
- * this case, we have to decode the data to points, and then rewrite the
data points to different
+ * This method is for rewriting the Chunk which data is in the different
time partitions. In this
+ * case, we have to decode the data to points, and then rewrite the data
points to different
* chunkWriters, finally write chunks to their own upgraded TsFiles.
*/
- protected void rewrite(
+ protected void reWriteChunk(
String deviceId,
- List<MeasurementSchema> schemas,
- List<List<PageHeader>> pageHeadersInChunkGroup,
- List<List<ByteBuffer>> dataInChunkGroup,
- List<List<Boolean>> needToDecodeInfoInChunkGroup)
+ boolean firstChunkInChunkGroup,
+ MeasurementSchema schema,
+ List<PageHeader> pageHeadersInChunk,
+ List<ByteBuffer> pageDataInChunk,
+ List<Boolean> needToDecodeInfoInChunk)
throws IOException, PageException {
- Map<Long, Map<MeasurementSchema, ChunkWriterImpl>>
chunkWritersInChunkGroup = new HashMap<>();
- for (int i = 0; i < schemas.size(); i++) {
- MeasurementSchema schema = schemas.get(i);
- List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i);
- List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
- List<Boolean> needToDecodeInfoInChunk =
needToDecodeInfoInChunkGroup.get(i);
- valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(),
schema.getType());
- for (int j = 0; j < pageDataInChunk.size(); j++) {
- if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
- decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j),
chunkWritersInChunkGroup);
- } else {
- writePageInToFile(
- schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j),
chunkWritersInChunkGroup);
- }
+ valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(),
schema.getType());
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>();
+ for (int i = 0; i < pageDataInChunk.size(); i++) {
+ if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) {
+ decodeAndWritePage(schema, pageDataInChunk.get(i),
partitionChunkWriterMap);
+ } else {
+ writePage(
+ schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i),
partitionChunkWriterMap);
}
}
-
- for (Entry<Long, Map<MeasurementSchema, ChunkWriterImpl>> entry :
- chunkWritersInChunkGroup.entrySet()) {
+ for (Entry<Long, ChunkWriterImpl> entry :
partitionChunkWriterMap.entrySet()) {
long partitionId = entry.getKey();
TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
- tsFileIOWriter.startChunkGroup(deviceId);
- // write chunks to their own upgraded tsFiles
- for (IChunkWriter chunkWriter : entry.getValue().values()) {
- chunkWriter.writeToFileWriter(tsFileIOWriter);
+ if (firstChunkInChunkGroup) {
+ tsFileIOWriter.startChunkGroup(deviceId);
}
+ // write chunks to their own upgraded tsFiles
+ IChunkWriter chunkWriter = entry.getValue();
+ chunkWriter.writeToFileWriter(tsFileIOWriter);
+ }
+ }
+
+ protected void endChunkGroup() throws IOException, PageException {
+ for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
tsFileIOWriter.endChunkGroup();
}
}
@@ -380,46 +359,42 @@ public class TsFileRewriteTool implements AutoCloseable {
});
}
- protected void writePageInToFile(
+ protected void writePage(
MeasurementSchema schema,
PageHeader pageHeader,
ByteBuffer pageData,
- Map<Long, Map<MeasurementSchema, ChunkWriterImpl>>
chunkWritersInChunkGroup)
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
throws PageException {
long partitionId =
StorageEngine.getTimePartition(pageHeader.getStartTime());
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
- Map<MeasurementSchema, ChunkWriterImpl> chunkWriters =
- chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
- ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new
ChunkWriterImpl(schema));
+ ChunkWriterImpl chunkWriter =
+ partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new
ChunkWriterImpl(schema));
chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
- chunkWriters.put(schema, chunkWriter);
- chunkWritersInChunkGroup.put(partitionId, chunkWriters);
}
- protected void decodeAndWritePageInToFiles(
+ protected void decodeAndWritePage(
MeasurementSchema schema,
ByteBuffer pageData,
- Map<Long, Map<MeasurementSchema, ChunkWriterImpl>>
chunkWritersInChunkGroup)
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
throws IOException {
valueDecoder.reset();
PageReader pageReader =
new PageReader(pageData, schema.getType(), valueDecoder,
defaultTimeDecoder, null);
BatchData batchData = pageReader.getAllSatisfiedPageData();
- rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup);
+ rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
}
protected void rewritePageIntoFiles(
BatchData batchData,
MeasurementSchema schema,
- Map<Long, Map<MeasurementSchema, ChunkWriterImpl>>
chunkWritersInChunkGroup) {
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap) {
while (batchData.hasCurrent()) {
long time = batchData.currentTime();
Object value = batchData.currentValue();
long partitionId = StorageEngine.getTimePartition(time);
- Map<MeasurementSchema, ChunkWriterImpl> chunkWriters =
- chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
- ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new
ChunkWriterImpl(schema));
+ ChunkWriterImpl chunkWriter =
+ partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new
ChunkWriterImpl(schema));
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
switch (schema.getType()) {
case INT32:
@@ -445,8 +420,6 @@ public class TsFileRewriteTool implements AutoCloseable {
String.format("Data type %s is not supported.",
schema.getType()));
}
batchData.next();
- chunkWriters.put(schema, chunkWriter);
- chunkWritersInChunkGroup.put(partitionId, chunkWriters);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
index 90646b5..c86b940 100644
---
a/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
+++
b/server/src/main/java/org/apache/iotdb/db/tools/upgrade/TsFileOnlineUpgradeTool.java
@@ -92,74 +92,103 @@ public class TsFileOnlineUpgradeTool extends
TsFileRewriteTool {
return;
}
- int headerLength =
+ long headerLength =
TSFileConfig.MAGIC_STRING.getBytes().length
+ TSFileConfig.VERSION_NUMBER_V2.getBytes().length;
reader.position(headerLength);
- List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
- List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
- List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
byte marker;
- List<MeasurementSchema> measurementSchemaList = new ArrayList<>();
+ long firstChunkPositionInChunkGroup = headerLength;
+ boolean firstChunkInChunkGroup = true;
+ String deviceId = null;
+ boolean skipReadingChunk = true;
try {
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
- ChunkHeader header = ((TsFileSequenceReaderForV2)
reader).readChunkHeader();
- MeasurementSchema measurementSchema =
- new MeasurementSchema(
- header.getMeasurementID(),
- header.getDataType(),
- header.getEncodingType(),
- header.getCompressionType());
- measurementSchemaList.add(measurementSchema);
- TSDataType dataType = header.getDataType();
- TSEncoding encoding = header.getEncodingType();
- List<PageHeader> pageHeadersInChunk = new ArrayList<>();
- List<ByteBuffer> dataInChunk = new ArrayList<>();
- List<Boolean> needToDecodeInfo = new ArrayList<>();
- int dataSize = header.getDataSize();
- while (dataSize > 0) {
- // a new Page
- PageHeader pageHeader = ((TsFileSequenceReaderForV2)
reader).readPageHeader(dataType);
- boolean needToDecode = checkIfNeedToDecode(dataType, encoding,
pageHeader);
- needToDecodeInfo.add(needToDecode);
- ByteBuffer pageData =
- !needToDecode
- ? ((TsFileSequenceReaderForV2)
reader).readCompressedPage(pageHeader)
- : reader.readPage(pageHeader,
header.getCompressionType());
- pageHeadersInChunk.add(pageHeader);
- dataInChunk.add(pageData);
- dataSize -=
- (Integer.BYTES * 2 // the bytes size of uncompressedSize and
compressedSize
- // count, startTime, endTime bytes size in old statistics
- + 24
- // statistics bytes size
- // new boolean StatsSize is 8 bytes larger than old one
- + (pageHeader.getStatistics().getStatsSize()
- - (dataType == TSDataType.BOOLEAN ? 8 : 0))
- // page data bytes
- + pageHeader.getCompressedSize());
+ if (skipReadingChunk || deviceId == null) {
+ ChunkHeader header = ((TsFileSequenceReaderForV2)
reader).readChunkHeader();
+ int dataSize = header.getDataSize();
+ while (dataSize > 0) {
+ // a new Page
+ PageHeader pageHeader =
+ ((TsFileSequenceReaderForV2)
reader).readPageHeader(header.getDataType());
+ ((TsFileSequenceReaderForV2)
reader).readCompressedPage(pageHeader);
+ dataSize -=
+ (Integer.BYTES * 2 // the bytes size of uncompressedSize
and compressedSize
+ // count, startTime, endTime bytes size in old
statistics
+ + 24
+ // statistics bytes size
+ // new boolean StatsSize is 8 bytes larger than old one
+ + (pageHeader.getStatistics().getStatsSize()
+ - (header.getDataType() == TSDataType.BOOLEAN ? 8
: 0))
+ // page data bytes
+ + pageHeader.getCompressedSize());
+ }
+ } else {
+ ChunkHeader header = ((TsFileSequenceReaderForV2)
reader).readChunkHeader();
+ MeasurementSchema measurementSchema =
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType());
+ TSDataType dataType = header.getDataType();
+ TSEncoding encoding = header.getEncodingType();
+ List<PageHeader> pageHeadersInChunk = new ArrayList<>();
+ List<ByteBuffer> dataInChunk = new ArrayList<>();
+ List<Boolean> needToDecodeInfo = new ArrayList<>();
+ int dataSize = header.getDataSize();
+ while (dataSize > 0) {
+ // a new Page
+ PageHeader pageHeader =
+ ((TsFileSequenceReaderForV2)
reader).readPageHeader(dataType);
+ boolean needToDecode = checkIfNeedToDecode(dataType, encoding,
pageHeader);
+ needToDecodeInfo.add(needToDecode);
+ ByteBuffer pageData =
+ !needToDecode
+ ? ((TsFileSequenceReaderForV2)
reader).readCompressedPage(pageHeader)
+ : reader.readPage(pageHeader,
header.getCompressionType());
+ pageHeadersInChunk.add(pageHeader);
+ dataInChunk.add(pageData);
+ dataSize -=
+ (Integer.BYTES * 2 // the bytes size of uncompressedSize
and compressedSize
+ // count, startTime, endTime bytes size in old
statistics
+ + 24
+ // statistics bytes size
+ // new boolean StatsSize is 8 bytes larger than old one
+ + (pageHeader.getStatistics().getStatsSize()
+ - (dataType == TSDataType.BOOLEAN ? 8 : 0))
+ // page data bytes
+ + pageHeader.getCompressedSize());
+ }
+ reWriteChunk(
+ deviceId,
+ firstChunkInChunkGroup,
+ measurementSchema,
+ pageHeadersInChunk,
+ dataInChunk,
+ needToDecodeInfo);
+ if (firstChunkInChunkGroup) {
+ firstChunkInChunkGroup = false;
+ }
}
- pageHeadersInChunkGroup.add(pageHeadersInChunk);
- pageDataInChunkGroup.add(dataInChunk);
- needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
break;
case MetaMarker.CHUNK_GROUP_HEADER:
// this is the footer of a ChunkGroup in TsFileV2.
- ChunkGroupHeader chunkGroupFooter =
- ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
- String deviceID = chunkGroupFooter.getDeviceID();
- rewrite(
- deviceID,
- measurementSchemaList,
- pageHeadersInChunkGroup,
- pageDataInChunkGroup,
- needToDecodeInfoInChunkGroup);
- pageHeadersInChunkGroup.clear();
- pageDataInChunkGroup.clear();
- measurementSchemaList.clear();
- needToDecodeInfoInChunkGroup.clear();
+ if (skipReadingChunk) {
+ skipReadingChunk = false;
+ ChunkGroupHeader chunkGroupFooter =
+ ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
+ deviceId = chunkGroupFooter.getDeviceID();
+ reader.position(firstChunkPositionInChunkGroup);
+ } else {
+ endChunkGroup();
+ skipReadingChunk = true;
+ ((TsFileSequenceReaderForV2) reader).readChunkGroupFooter();
+ deviceId = null;
+ firstChunkPositionInChunkGroup = reader.position();
+ firstChunkInChunkGroup = true;
+ }
break;
case MetaMarker.VERSION:
long version = ((TsFileSequenceReaderForV2) reader).readVersion();
@@ -187,6 +216,7 @@ public class TsFileOnlineUpgradeTool extends
TsFileRewriteTool {
for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
tsFileIOWriter.writePlanIndices();
}
+ firstChunkPositionInChunkGroup = reader.position();
break;
default:
// the disk file is corrupted, using this file may be dangerous
@@ -216,7 +246,7 @@ public class TsFileOnlineUpgradeTool extends
TsFileRewriteTool {
currentMod = null;
}
}
- } catch (IOException e2) {
+ } catch (Exception e2) {
throw new IOException(
"TsFile upgrade process cannot proceed at position "
+ reader.position()
@@ -252,16 +282,16 @@ public class TsFileOnlineUpgradeTool extends
TsFileRewriteTool {
}
@Override
- protected void decodeAndWritePageInToFiles(
+ protected void decodeAndWritePage(
MeasurementSchema schema,
ByteBuffer pageData,
- Map<Long, Map<MeasurementSchema, ChunkWriterImpl>>
chunkWritersInChunkGroup)
+ Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
throws IOException {
valueDecoder.reset();
PageReaderV2 pageReader =
new PageReaderV2(pageData, schema.getType(), valueDecoder,
defaultTimeDecoder, null);
BatchData batchData = pageReader.getAllSatisfiedPageData();
- rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup);
+ rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
}
/** check if the file to be upgraded has correct magic strings and version
number */