This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b2c42b79004 Fix flush error due to compression ratio (#12953)
b2c42b79004 is described below
commit b2c42b790049641641f3e6dfaf50e4fcf3b2ce4b
Author: Haonan <[email protected]>
AuthorDate: Wed Jul 17 13:30:06 2024 +0800
Fix flush error due to compression ratio (#12953)
---
.../dataregion/flush/CompressionRatio.java | 38 ++++++++++++++--------
.../memtable/AlignedWritableMemChunk.java | 13 --------
.../dataregion/memtable/WritableMemChunk.java | 6 ----
.../dataregion/flush/CompressionRatioTest.java | 36 ++++++++++++++++++++
4 files changed, 60 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
index 41fbbb4a80f..6253717386e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
@@ -144,11 +144,18 @@ public class CompressionRatio {
int maxRatioIndex = 0;
for (int i = 0; i < ratioFiles.length; i++) {
String[] fileNameArray = ratioFiles[i].getName().split("-");
- long diskSize = Long.parseLong(fileNameArray[2]);
- if (diskSize > totalDiskSize) {
- totalMemorySize = new AtomicLong(Long.parseLong(fileNameArray[1]));
- totalDiskSize = diskSize;
- maxRatioIndex = i;
+ // fileNameArray.length != 3 means the compression ratio may be
negative, ignore it
+ if (fileNameArray.length == 3) {
+ try {
+ long diskSize = Long.parseLong(fileNameArray[2]);
+ if (diskSize > totalDiskSize) {
+ totalMemorySize = new
AtomicLong(Long.parseLong(fileNameArray[1]));
+ totalDiskSize = diskSize;
+ maxRatioIndex = i;
+ }
+ } catch (NumberFormatException ignore) {
+ // ignore illegal compression file name
+ }
}
}
LOGGER.debug(
@@ -165,11 +172,18 @@ public class CompressionRatio {
totalDiskSize = 1;
for (int i = 0; i < ratioFilesBeforeV121.length; i++) {
String[] fileNameArray =
ratioFilesBeforeV121[i].getName().split("-");
- double currentCompressRatio =
- Double.parseDouble(fileNameArray[1]) /
Double.parseDouble(fileNameArray[2]);
- if (getRatio() < currentCompressRatio) {
- totalMemorySize = new AtomicLong((long) currentCompressRatio);
- maxRatioIndex = i;
+ // fileNameArray.length != 3 means the compression ratio may be
negative, ignore it
+ if (fileNameArray.length == 3) {
+ try {
+ double currentCompressRatio =
+ Double.parseDouble(fileNameArray[1]) /
Double.parseDouble(fileNameArray[2]);
+ if (getRatio() < currentCompressRatio) {
+ totalMemorySize = new AtomicLong((long) currentCompressRatio);
+ maxRatioIndex = i;
+ }
+ } catch (NumberFormatException ignore) {
+ // ignore illegal compression file name
+ }
}
}
deleteRedundantFilesByIndex(ratioFilesBeforeV121, maxRatioIndex);
@@ -201,10 +215,6 @@ public class CompressionRatio {
totalDiskSize = 0L;
}
- public static void decreaseDuplicatedMemorySize(long size) {
- totalMemorySize.addAndGet(-size);
- }
-
public static CompressionRatio getInstance() {
return CompressionRatioHolder.INSTANCE;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index ba78113bd03..77b98b8b70e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -19,10 +19,8 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
-import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
-import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
@@ -390,17 +388,6 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
list.getValueIndex(sortedRowIndex);
}
if (timeDuplicateInfo[sortedRowIndex]) {
- if (!list.isNullValue(list.getValueIndex(sortedRowIndex),
columnIndex)) {
- long recordSize =
- MemUtils.getRecordSize(
- tsDataType,
- tsDataType.isBinary()
- ? list.getBinaryByValueIndex(
- list.getValueIndex(sortedRowIndex),
columnIndex)
- : null,
- true);
- CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
- }
continue;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 3f05af15fb6..bc665cff5cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -20,9 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
-import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.tsfile.enums.TSDataType;
@@ -340,10 +338,6 @@ public class WritableMemChunk implements IWritableMemChunk
{
// skip duplicated data
if ((sortedRowIndex + 1 < list.rowCount() && (time ==
list.getTime(sortedRowIndex + 1)))) {
- long recordSize =
- MemUtils.getRecordSize(
- tsDataType, tsDataType.isBinary() ?
list.getBinary(sortedRowIndex) : null, true);
- CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
continue;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
index 439d1b81e96..f088fee719a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
@@ -104,4 +104,40 @@ public class CompressionRatioTest {
// largest diskSize to the memory
assertEquals(2, compressionRatio.getRatio(), 0.1);
}
+
+ @Test
+ public void testRestoreIllegal1() throws IOException {
+ Files.createFile(
+ new File(
+ directory,
+ String.format(Locale.ENGLISH,
CompressionRatio.RATIO_FILE_PATH_FORMAT, 10, 50))
+ .toPath());
+
+ Files.createFile(
+ new File(
+ directory,
+ String.format(Locale.ENGLISH,
CompressionRatio.RATIO_FILE_PATH_FORMAT, -1000, 100))
+ .toPath());
+
+ compressionRatio.restore();
+
+ // if multiple files exist in the system due to some exceptions, restore
the file with the
+ // largest diskSize to the memory
+ assertEquals(0.2, compressionRatio.getRatio(), 0.1);
+ }
+
+ @Test
+ public void testRestoreIllegal2() throws IOException {
+
+ Files.createFile(
+ new File(
+ directory,
+ String.format(Locale.ENGLISH,
CompressionRatio.RATIO_FILE_PATH_FORMAT, -1000, 100))
+ .toPath());
+
+ compressionRatio.restore();
+
+ // if compression ratio from file is negative, assume the compression
ratio is 0 / 0 = NaN
+ assertEquals(Double.NaN, compressionRatio.getRatio(), 0.1);
+ }
}