This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 33ee473 [To rel/0.12][ISSUE-3378] Fix NPE when clear upgrade folder;
Fix some upgraded pageHeader missing statistics (#3376)
33ee473 is described below
commit 33ee473d1f2476180b4b6207fc1d25ef73205a43
Author: Haonan <[email protected]>
AuthorDate: Wed Jun 9 13:37:17 2021 +0800
[To rel/0.12][ISSUE-3378] Fix NPE when clear upgrade folder; Fix some
upgraded pageHeader missing statistics (#3376)
---
.../engine/storagegroup/StorageGroupProcessor.java | 3 +++
.../iotdb/db/engine/upgrade/UpgradeTask.java | 7 ++++-
.../apache/iotdb/db/tools/TsFileRewriteTool.java | 12 +++------
.../iotdb/tsfile/write/chunk/ChunkWriterImpl.java | 31 ++++++++++++++++------
4 files changed, 35 insertions(+), 18 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b2d4b60..ff626d6 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2101,6 +2101,7 @@ public class StorageGroupProcessor {
return;
}
for (TsFileResource resource : resources) {
+ resource.writeLock();
try {
UpgradeUtils.moveUpgradedFiles(resource);
tsFileManagement.addAll(resource.getUpgradedResources(), isseq);
@@ -2114,6 +2115,8 @@ public class StorageGroupProcessor {
resource.getTsFile().getAbsolutePath() + "," +
UpgradeCheckStatus.UPGRADE_SUCCESS);
} catch (IOException e) {
logger.error("Unable to load {}, caused by ", resource, e);
+ } finally {
+ resource.writeUnlock();
}
}
// delete upgrade folder when it is empty
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
index 76b1bac..bcf29d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
@@ -139,8 +139,13 @@ public class UpgradeTask extends WrappedRunnable {
}
File virtualStorageGroupDir = fsFactory.getFile(storageGroup, "0");
File upgradeDir = fsFactory.getFile(virtualStorageGroupDir, "upgrade");
-
+ if (upgradeDir == null) {
+ continue;
+ }
File[] tmpPartitionDirList = upgradeDir.listFiles();
+ if (tmpPartitionDirList == null) {
+ continue;
+ }
for (File tmpPartitionDir : tmpPartitionDirList) {
if (tmpPartitionDir.isDirectory()) {
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
index 955ed77..e1e4bc7 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java
@@ -319,17 +319,12 @@ public class TsFileRewriteTool implements AutoCloseable {
List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
List<Boolean> needToDecodeInfoInChunk =
needToDecodeInfoInChunkGroup.get(i);
valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(),
schema.getType());
- boolean isOnlyOnePageChunk = pageDataInChunk.size() == 1;
for (int j = 0; j < pageDataInChunk.size(); j++) {
if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j),
chunkWritersInChunkGroup);
} else {
writePageInToFile(
- schema,
- pageHeadersInChunk.get(j),
- pageDataInChunk.get(j),
- chunkWritersInChunkGroup,
- isOnlyOnePageChunk);
+ schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j),
chunkWritersInChunkGroup);
}
}
}
@@ -389,15 +384,14 @@ public class TsFileRewriteTool implements AutoCloseable {
MeasurementSchema schema,
PageHeader pageHeader,
ByteBuffer pageData,
- Map<Long, Map<MeasurementSchema, ChunkWriterImpl>>
chunkWritersInChunkGroup,
- boolean isOnlyOnePageChunk)
+ Map<Long, Map<MeasurementSchema, ChunkWriterImpl>>
chunkWritersInChunkGroup)
throws PageException {
long partitionId =
StorageEngine.getTimePartition(pageHeader.getStartTime());
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
Map<MeasurementSchema, ChunkWriterImpl> chunkWriters =
chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new
ChunkWriterImpl(schema));
- chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader,
isOnlyOnePageChunk);
+ chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
chunkWriters.put(schema, chunkWriter);
chunkWritersInChunkGroup.put(partitionId, chunkWriters);
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
index 39e5f79..48ff92c 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/chunk/ChunkWriterImpl.java
@@ -294,8 +294,7 @@ public class ChunkWriterImpl implements IChunkWriter {
if (numOfPages == 0) { // record the firstPageStatistics
this.firstPageStatistics = pageWriter.getStatistics();
this.sizeWithoutStatistic =
pageWriter.writePageHeaderAndDataIntoBuff(pageBuffer, true);
- } else if (numOfPages == 1
- && firstPageStatistics != null) { // put the firstPageStatistics
into pageBuffer
+ } else if (numOfPages == 1) { // put the firstPageStatistics into
pageBuffer
byte[] b = pageBuffer.toByteArray();
pageBuffer.reset();
pageBuffer.write(b, 0, this.sizeWithoutStatistic);
@@ -374,16 +373,32 @@ public class ChunkWriterImpl implements IChunkWriter {
* write the page header and data into the PageWriter's output stream.
@NOTE: for upgrading
* 0.11/v2 to 0.12/v3 TsFile
*/
- public void writePageHeaderAndDataIntoBuff(
- ByteBuffer data, PageHeader header, boolean isOnlyOnePageChunk) throws
PageException {
-
+ public void writePageHeaderAndDataIntoBuff(ByteBuffer data, PageHeader
header)
+ throws PageException {
// write the page header to pageBuffer
try {
logger.debug(
"start to flush a page header into buffer, buffer position {} ",
pageBuffer.size());
-
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(),
pageBuffer);
-
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(),
pageBuffer);
- if (!isOnlyOnePageChunk) {
+ // serialize pageHeader see writePageToPageBuffer method
+ if (numOfPages == 0) { // record the firstPageStatistics
+ this.firstPageStatistics = header.getStatistics();
+ this.sizeWithoutStatistic +=
+
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(),
pageBuffer);
+ this.sizeWithoutStatistic +=
+
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(),
pageBuffer);
+ } else if (numOfPages == 1) { // put the firstPageStatistics into
pageBuffer
+ byte[] b = pageBuffer.toByteArray();
+ pageBuffer.reset();
+ pageBuffer.write(b, 0, this.sizeWithoutStatistic);
+ firstPageStatistics.serialize(pageBuffer);
+ pageBuffer.write(b, this.sizeWithoutStatistic, b.length -
this.sizeWithoutStatistic);
+
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(),
pageBuffer);
+
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(),
pageBuffer);
+ header.getStatistics().serialize(pageBuffer);
+ firstPageStatistics = null;
+ } else {
+
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getUncompressedSize(),
pageBuffer);
+
ReadWriteForEncodingUtils.writeUnsignedVarInt(header.getCompressedSize(),
pageBuffer);
header.getStatistics().serialize(pageBuffer);
}
logger.debug(