This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 848af7209ec upgrade compression ratio file name format and solve the
problem of high compression ratio caused by repeated data writing (#11220)
848af7209ec is described below
commit 848af7209ec629ca5507c0d5823f35961b64fe60
Author: Zhijia Cao <[email protected]>
AuthorDate: Fri Oct 13 09:49:38 2023 +0800
upgrade compression ratio file name format and solve the problem of high
compression ratio caused by repeated data writing (#11220)
---
.../dataregion/flush/CompressionRatio.java | 132 ++++++++++++---------
.../memtable/AlignedWritableMemChunk.java | 19 ++-
.../dataregion/memtable/TsFileProcessor.java | 2 +-
.../dataregion/memtable/WritableMemChunk.java | 18 ++-
.../dataregion/flush/CompressionRatioTest.java | 108 +++++------------
5 files changed, 142 insertions(+), 137 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
index cf58b438c01..32e1cf80416 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.db.storageengine.dataregion.flush;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
-import com.google.common.util.concurrent.AtomicDouble;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,16 +32,17 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Locale;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This class is used to count, compute and persist the compression ratio of
tsfiles. Whenever the
* task of closing a file ends, the compression ratio of the file is
calculated based on the total
- * MemTable size and the total size of the tsfile on disk. {@code
compressionRatioSum} records the
- * sum of these compression ratios, and {@Code calcTimes} records the number
of closed file tasks.
- * When the compression rate of the current system is obtained, the average
compression ratio is
- * returned as the result, that is {@code compressionRatioSum}/{@Code
calcTimes}. At the same time,
- * each time the compression ratio statistics are updated, these two
parameters are persisted on
- * disk for system recovery.
+ * MemTable size and the total size of the tsfile on disk. {@code
totalMemorySize} records the data
+ * size of memory, and {@code totalDiskSize} records the data size of disk.
When the compression
+ * rate of the current system is obtained, the average compression ratio is
returned as the result,
+ * that is {@code totalMemorySize}/{@code totalDiskSize}. At the same time,
each time the
+ * compression ratio statistics are updated, these two parameters are
persisted on disk for system
+ * recovery.
*/
public class CompressionRatio {
@@ -51,24 +52,23 @@ public class CompressionRatio {
static final String COMPRESSION_RATIO_DIR = "compression_ratio";
- private static final String FILE_PREFIX = "Ratio-";
+ private static final String FILE_PREFIX_BEFORE_V121 = "Ratio-";
+ private static final String FILE_PREFIX = "Compress-";
private static final String SEPARATOR = "-";
- static final String RATIO_FILE_PATH_FORMAT = FILE_PREFIX + "%f" + SEPARATOR
+ "%d";
+ static final String RATIO_FILE_PATH_FORMAT = FILE_PREFIX + "%d" + SEPARATOR
+ "%d";
- private static final double DEFAULT_COMPRESSION_RATIO = 2.0;
+ /** The data size on memory */
+ private static AtomicLong totalMemorySize = new AtomicLong(0);
- private AtomicDouble compressionRatio = new
AtomicDouble(DEFAULT_COMPRESSION_RATIO);
-
- /** The total sum of all compression ratios. */
- private double compressionRatioSum;
-
- /** The number of compression ratios. */
- private long calcTimes;
+ /** The data size on disk */
+ private long totalDiskSize = 0L;
private File directory;
+ private String oldFileName = String.format(RATIO_FILE_PATH_FORMAT, 0, 0);
+
private CompressionRatio() {
directory =
SystemFileFactory.INSTANCE.getFile(
@@ -83,30 +83,26 @@ public class CompressionRatio {
/**
* Whenever the task of closing a file ends, the compression ratio of the
file is calculated and
* call this method.
- *
- * @param currentCompressionRatio the compression ratio of the closing file.
*/
- public synchronized void updateRatio(double currentCompressionRatio) throws
IOException {
- File oldFile =
- SystemFileFactory.INSTANCE.getFile(
- directory,
- String.format(Locale.ENGLISH, RATIO_FILE_PATH_FORMAT,
compressionRatioSum, calcTimes));
- compressionRatioSum += currentCompressionRatio;
- calcTimes++;
+ public synchronized void updateRatio(long memorySize, long diskSize) throws
IOException {
+ File oldFile = SystemFileFactory.INSTANCE.getFile(directory, oldFileName);
+
+ totalMemorySize.addAndGet(memorySize);
+ totalDiskSize += diskSize;
File newFile =
SystemFileFactory.INSTANCE.getFile(
directory,
- String.format(Locale.ENGLISH, RATIO_FILE_PATH_FORMAT,
compressionRatioSum, calcTimes));
+ String.format(
+ Locale.ENGLISH, RATIO_FILE_PATH_FORMAT, totalMemorySize.get(),
totalDiskSize));
persist(oldFile, newFile);
- compressionRatio.set(compressionRatioSum / calcTimes);
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Compression ratio is {}", compressionRatio.get());
+ LOGGER.info("Compression ratio is {}", (double) totalMemorySize.get() /
totalDiskSize);
}
}
/** Get the average compression ratio for all closed files */
public double getRatio() {
- return compressionRatio.get();
+ return (double) totalMemorySize.get() / totalDiskSize;
}
private void persist(File oldFile, File newFile) throws IOException {
@@ -124,6 +120,7 @@ public class CompressionRatio {
oldFile.getAbsolutePath(),
newFile.getAbsolutePath());
}
+ this.oldFileName = newFile.getName();
}
private void checkDirectoryExist() throws IOException {
@@ -138,49 +135,70 @@ public class CompressionRatio {
return;
}
File[] ratioFiles = directory.listFiles((dir, name) ->
name.startsWith(FILE_PREFIX));
+ // First try to recover from the new version of the file, parse the file
name, and get the file
+ // with the largest disk size value
if (ratioFiles != null && ratioFiles.length > 0) {
- long maxTimes = 0;
- double maxCompressionRatioSum = 0;
int maxRatioIndex = 0;
for (int i = 0; i < ratioFiles.length; i++) {
- String[] splits = ratioFiles[i].getName().split("-");
- long times = Long.parseLong(splits[2]);
- if (times > maxTimes) {
- maxTimes = times;
- maxCompressionRatioSum = Double.parseDouble(splits[1]);
+ String[] fileNameArray = ratioFiles[i].getName().split("-");
+ long diskSize = Long.parseLong(fileNameArray[2]);
+ if (diskSize > totalDiskSize) {
+ totalMemorySize = new AtomicLong(Long.parseLong(fileNameArray[1]));
+ totalDiskSize = diskSize;
maxRatioIndex = i;
}
}
- calcTimes = maxTimes;
- compressionRatioSum = maxCompressionRatioSum;
- if (calcTimes != 0) {
- compressionRatio.set(compressionRatioSum / calcTimes);
- }
LOGGER.debug(
- "After restoring from compression ratio file, compressionRatioSum =
{}, calcTimes = {}",
- compressionRatioSum,
- calcTimes);
- for (int i = 0; i < ratioFiles.length; i++) {
- if (i != maxRatioIndex) {
- Files.delete(ratioFiles[i].toPath());
- ratioFiles[i].delete();
+ "After restoring from compression ratio file, total memory size =
{}, total disk size = {}",
+ totalMemorySize,
+ totalDiskSize);
+ deleteRedundantFilesByIndex(ratioFiles, maxRatioIndex);
+ } else { // If there is no new file, try to restore from the old version
file
+ File[] ratioFilesBeforeV121 =
+ directory.listFiles((dir, name) ->
name.startsWith(FILE_PREFIX_BEFORE_V121));
+ if (ratioFilesBeforeV121 != null && ratioFilesBeforeV121.length > 0) {
+ int maxRatioIndex = 0;
+ totalDiskSize = 1;
+ for (int i = 0; i < ratioFilesBeforeV121.length; i++) {
+ String[] fileNameArray =
ratioFilesBeforeV121[i].getName().split("-");
+ double currentCompressRatio =
+ Double.parseDouble(fileNameArray[1]) /
Double.parseDouble(fileNameArray[2]);
+ if (getRatio() < currentCompressRatio) {
+ totalMemorySize = new AtomicLong((long) currentCompressRatio);
+ maxRatioIndex = i;
+ }
}
+ deleteRedundantFilesByIndex(ratioFilesBeforeV121, maxRatioIndex);
}
}
}
- /** Only for test */
- void reset() {
- calcTimes = 0;
- compressionRatioSum = 0;
+ public static void deleteRedundantFilesByIndex(File[] files, int index)
throws IOException {
+ for (int i = 0; i < files.length; i++) {
+ if (i != index) {
+ Files.delete(files[i].toPath());
+ }
+ }
}
- public double getCompressionRatioSum() {
- return compressionRatioSum;
+ @TestOnly
+ void reset() throws IOException {
+ if (!directory.exists()) {
+ return;
+ }
+ File[] ratioFiles = directory.listFiles((dir, name) ->
name.startsWith(FILE_PREFIX));
+ if (ratioFiles == null) {
+ return;
+ }
+ for (File file : ratioFiles) {
+ Files.delete(file.toPath());
+ }
+ totalMemorySize = new AtomicLong(0);
+ totalDiskSize = 0L;
}
- long getCalcTimes() {
- return calcTimes;
+ public static void decreaseDuplicatedMemorySize(long size) {
+ totalMemorySize.addAndGet(-size);
}
public static CompressionRatio getInstance() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index aa4b3c4d6ed..a80a40e1995 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -19,8 +19,12 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -56,6 +60,8 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
this.measurementIndexMap = new LinkedHashMap<>();
List<TSDataType> dataTypeList = new ArrayList<>();
@@ -362,6 +368,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
+ TSDataType tsDataType = dataTypes.get(columnIndex);
// skip time duplicated rows
long time = list.getTime(sortedRowIndex);
@@ -371,6 +378,16 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
lastValidPointIndexForTimeDupCheck.right =
list.getValueIndex(sortedRowIndex);
}
if (timeDuplicateInfo[sortedRowIndex]) {
+ if (!list.isNullValue(sortedRowIndex, columnIndex)) {
+ long recordSize =
+ MemUtils.getRecordSize(
+ tsDataType,
+ tsDataType == TSDataType.TEXT
+ ? list.getBinaryByValueIndex(sortedRowIndex,
columnIndex)
+ : null,
+ CONFIG.isEnableMemControl());
+ CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
+ }
continue;
}
}
@@ -393,7 +410,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
}
boolean isNull = list.isNullValue(originRowIndex, columnIndex);
- switch (dataTypes.get(columnIndex)) {
+ switch (tsDataType) {
case BOOLEAN:
alignedChunkWriter.writeByColumn(
time, list.getBooleanByValueIndex(originRowIndex,
columnIndex), isNull);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index a1517fadace..6f0e6966b06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1335,7 +1335,7 @@ public class TsFileProcessor {
String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionId();
WritingMetrics.getInstance()
.recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId,
compressionRatio);
- CompressionRatio.getInstance().updateRatio(compressionRatio);
+ CompressionRatio.getInstance().updateRatio(totalMemTableSize,
writer.getPos());
} catch (IOException e) {
logger.error(
"{}: {} update compression ratio failed",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 21ba12505b7..8d062396f49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -18,7 +18,11 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -44,6 +48,8 @@ public class WritableMemChunk implements IWritableMemChunk {
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
private static final Logger LOGGER =
LoggerFactory.getLogger(WritableMemChunk.class);
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+
public WritableMemChunk(IMeasurementSchema schema) {
this.schema = schema;
this.list = TVList.newList(schema.getType());
@@ -323,8 +329,16 @@ public class WritableMemChunk implements IWritableMemChunk
{
for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount();
sortedRowIndex++) {
long time = list.getTime(sortedRowIndex);
+ TSDataType tsDataType = schema.getType();
+
// skip duplicated data
if ((sortedRowIndex + 1 < list.rowCount() && (time ==
list.getTime(sortedRowIndex + 1)))) {
+ long recordSize =
+ MemUtils.getRecordSize(
+ tsDataType,
+ tsDataType == TSDataType.TEXT ? list.getBinary(sortedRowIndex)
: null,
+ CONFIG.isEnableMemControl());
+ CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
continue;
}
@@ -333,7 +347,7 @@ public class WritableMemChunk implements IWritableMemChunk {
chunkWriterImpl.setLastPoint(true);
}
- switch (schema.getType()) {
+ switch (tsDataType) {
case BOOLEAN:
chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex));
break;
@@ -353,7 +367,7 @@ public class WritableMemChunk implements IWritableMemChunk {
chunkWriterImpl.write(time, list.getBinary(sortedRowIndex));
break;
default:
- LOGGER.error("WritableMemChunk does not support data type: {}",
schema.getType());
+ LOGGER.error("WritableMemChunk does not support data type: {}",
tsDataType);
break;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
index bfe01387c01..ccbca71a323 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.Locale;
import static org.junit.Assert.assertEquals;
@@ -49,7 +50,6 @@ public class CompressionRatioTest {
EnvironmentUtils.envSetUp();
FileUtils.forceMkdir(new File(directory));
compressionRatio.reset();
- compressionRatio.restore();
}
@After
@@ -59,93 +59,49 @@ public class CompressionRatioTest {
@Test
public void testCompressionRatio() throws IOException {
- double compressionRatioSum = 0;
- int calcuTimes = 0;
- if (new File(
- directory,
- String.format(
- Locale.ENGLISH,
- CompressionRatio.RATIO_FILE_PATH_FORMAT,
- compressionRatioSum,
- calcuTimes))
- .exists()) {
- fail();
- }
- double compressionRatio = 10;
- for (int i = 0; i < 500; i += compressionRatio) {
- this.compressionRatio.updateRatio(compressionRatio);
- if (new File(
- directory,
- String.format(
- Locale.ENGLISH,
- CompressionRatio.RATIO_FILE_PATH_FORMAT,
- compressionRatioSum,
- calcuTimes))
- .exists()) {
- fail();
- }
- calcuTimes++;
- compressionRatioSum += compressionRatio;
+ long totalMemorySize = 10;
+ long totalDiskSize = 5;
+
+ for (int i = 0; i < 5; i++) {
+ this.compressionRatio.updateRatio(10, 5);
if (!new File(
directory,
String.format(
Locale.ENGLISH,
CompressionRatio.RATIO_FILE_PATH_FORMAT,
- compressionRatioSum,
- calcuTimes))
+ totalMemorySize,
+ totalDiskSize))
.exists()) {
fail();
}
- assertEquals(
- 0, Double.compare(compressionRatioSum / calcuTimes,
this.compressionRatio.getRatio()));
+ assertEquals(2, this.compressionRatio.getRatio(), 0.1);
+ totalMemorySize += 10;
+ totalDiskSize += 5;
}
}
@Test
public void testRestore() throws IOException {
- double compressionRatioSum = 0;
- int calcuTimes = 0;
- if (new File(
- directory,
- String.format(
- Locale.ENGLISH,
- CompressionRatio.RATIO_FILE_PATH_FORMAT,
- compressionRatioSum,
- calcuTimes))
- .exists()) {
- fail();
- }
- int compressionRatio = 10;
- for (int i = 0; i < 100; i += compressionRatio) {
- this.compressionRatio.updateRatio(compressionRatio);
- if (new File(
- directory,
- String.format(
- Locale.ENGLISH,
- CompressionRatio.RATIO_FILE_PATH_FORMAT,
- compressionRatioSum,
- calcuTimes))
- .exists()) {
- fail();
- }
- calcuTimes++;
- compressionRatioSum += compressionRatio;
- if (!new File(
- directory,
- String.format(
- Locale.ENGLISH,
- CompressionRatio.RATIO_FILE_PATH_FORMAT,
- compressionRatioSum,
- calcuTimes))
- .exists()) {
- fail();
- }
- assertEquals(
- 0, Double.compare(compressionRatioSum / calcuTimes,
this.compressionRatio.getRatio()));
- }
- this.compressionRatio.restore();
- assertEquals(10, this.compressionRatio.getCalcTimes());
- assertEquals(
- 0, Double.compare(compressionRatioSum / calcuTimes,
this.compressionRatio.getRatio()));
+ Files.createFile(
+ new File(
+ directory,
+ String.format(Locale.ENGLISH,
CompressionRatio.RATIO_FILE_PATH_FORMAT, 1000, 100))
+ .toPath());
+ Files.createFile(
+ new File(
+ directory,
+ String.format(Locale.ENGLISH,
CompressionRatio.RATIO_FILE_PATH_FORMAT, 1000, 200))
+ .toPath());
+ Files.createFile(
+ new File(
+ directory,
+ String.format(Locale.ENGLISH,
CompressionRatio.RATIO_FILE_PATH_FORMAT, 1000, 500))
+ .toPath());
+
+ compressionRatio.restore();
+
+ // if multiple files exist in the system due to some exceptions, restore
the file with the
+ // largest diskSize to the memory
+ assertEquals(2, compressionRatio.getRatio(), 0.1);
}
}