This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 848af7209ec upgrade compression ratio file name format and solve the 
problem of high compression ratio caused by repeated data writing (#11220)
848af7209ec is described below

commit 848af7209ec629ca5507c0d5823f35961b64fe60
Author: Zhijia Cao <[email protected]>
AuthorDate: Fri Oct 13 09:49:38 2023 +0800

    upgrade compression ratio file name format and solve the problem of high 
compression ratio caused by repeated data writing (#11220)
---
 .../dataregion/flush/CompressionRatio.java         | 132 ++++++++++++---------
 .../memtable/AlignedWritableMemChunk.java          |  19 ++-
 .../dataregion/memtable/TsFileProcessor.java       |   2 +-
 .../dataregion/memtable/WritableMemChunk.java      |  18 ++-
 .../dataregion/flush/CompressionRatioTest.java     | 108 +++++------------
 5 files changed, 142 insertions(+), 137 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
index cf58b438c01..32e1cf80416 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.db.storageengine.dataregion.flush;
 
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 
-import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,16 +32,17 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Locale;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This class is used to count, compute and persist the compression ratio of 
tsfiles. Whenever the
  * task of closing a file ends, the compression ratio of the file is 
calculated based on the total
- * MemTable size and the total size of the tsfile on disk. {@code 
compressionRatioSum} records the
- * sum of these compression ratios, and {@Code calcTimes} records the number 
of closed file tasks.
- * When the compression rate of the current system is obtained, the average 
compression ratio is
- * returned as the result, that is {@code compressionRatioSum}/{@Code 
calcTimes}. At the same time,
- * each time the compression ratio statistics are updated, these two 
parameters are persisted on
- * disk for system recovery.
+ * MemTable size and the total size of the tsfile on disk. {@code 
totalMemorySize} records the data
+ * size of memory, and {@code totalDiskSize} records the data size of disk. 
When the compression
+ * rate of the current system is obtained, the average compression ratio is 
returned as the result,
+ * that is {@code totalMemorySize}/{@code totalDiskSize}. At the same time, 
each time the
+ * compression ratio statistics are updated, these two parameters are 
persisted on disk for system
+ * recovery.
  */
 public class CompressionRatio {
 
@@ -51,24 +52,23 @@ public class CompressionRatio {
 
   static final String COMPRESSION_RATIO_DIR = "compression_ratio";
 
-  private static final String FILE_PREFIX = "Ratio-";
+  private static final String FILE_PREFIX_BEFORE_V121 = "Ratio-";
+  private static final String FILE_PREFIX = "Compress-";
 
   private static final String SEPARATOR = "-";
 
-  static final String RATIO_FILE_PATH_FORMAT = FILE_PREFIX + "%f" + SEPARATOR 
+ "%d";
+  static final String RATIO_FILE_PATH_FORMAT = FILE_PREFIX + "%d" + SEPARATOR 
+ "%d";
 
-  private static final double DEFAULT_COMPRESSION_RATIO = 2.0;
+  /** The data size on memory */
+  private static AtomicLong totalMemorySize = new AtomicLong(0);
 
-  private AtomicDouble compressionRatio = new 
AtomicDouble(DEFAULT_COMPRESSION_RATIO);
-
-  /** The total sum of all compression ratios. */
-  private double compressionRatioSum;
-
-  /** The number of compression ratios. */
-  private long calcTimes;
+  /** The data size on disk */
+  private long totalDiskSize = 0L;
 
   private File directory;
 
+  private String oldFileName = String.format(RATIO_FILE_PATH_FORMAT, 0, 0);
+
   private CompressionRatio() {
     directory =
         SystemFileFactory.INSTANCE.getFile(
@@ -83,30 +83,26 @@ public class CompressionRatio {
   /**
    * Whenever the task of closing a file ends, the compression ratio of the 
file is calculated and
    * call this method.
-   *
-   * @param currentCompressionRatio the compression ratio of the closing file.
    */
-  public synchronized void updateRatio(double currentCompressionRatio) throws 
IOException {
-    File oldFile =
-        SystemFileFactory.INSTANCE.getFile(
-            directory,
-            String.format(Locale.ENGLISH, RATIO_FILE_PATH_FORMAT, 
compressionRatioSum, calcTimes));
-    compressionRatioSum += currentCompressionRatio;
-    calcTimes++;
+  public synchronized void updateRatio(long memorySize, long diskSize) throws 
IOException {
+    File oldFile = SystemFileFactory.INSTANCE.getFile(directory, oldFileName);
+
+    totalMemorySize.addAndGet(memorySize);
+    totalDiskSize += diskSize;
     File newFile =
         SystemFileFactory.INSTANCE.getFile(
             directory,
-            String.format(Locale.ENGLISH, RATIO_FILE_PATH_FORMAT, 
compressionRatioSum, calcTimes));
+            String.format(
+                Locale.ENGLISH, RATIO_FILE_PATH_FORMAT, totalMemorySize.get(), 
totalDiskSize));
     persist(oldFile, newFile);
-    compressionRatio.set(compressionRatioSum / calcTimes);
     if (LOGGER.isInfoEnabled()) {
-      LOGGER.info("Compression ratio is {}", compressionRatio.get());
+      LOGGER.info("Compression ratio is {}", (double) totalMemorySize.get() / 
totalDiskSize);
     }
   }
 
   /** Get the average compression ratio for all closed files */
   public double getRatio() {
-    return compressionRatio.get();
+    return (double) totalMemorySize.get() / totalDiskSize;
   }
 
   private void persist(File oldFile, File newFile) throws IOException {
@@ -124,6 +120,7 @@ public class CompressionRatio {
           oldFile.getAbsolutePath(),
           newFile.getAbsolutePath());
     }
+    this.oldFileName = newFile.getName();
   }
 
   private void checkDirectoryExist() throws IOException {
@@ -138,49 +135,70 @@ public class CompressionRatio {
       return;
     }
     File[] ratioFiles = directory.listFiles((dir, name) -> 
name.startsWith(FILE_PREFIX));
+    // First try to recover from the new version of the file, parse the file 
name, and get the file
+    // with the largest disk size value
     if (ratioFiles != null && ratioFiles.length > 0) {
-      long maxTimes = 0;
-      double maxCompressionRatioSum = 0;
       int maxRatioIndex = 0;
       for (int i = 0; i < ratioFiles.length; i++) {
-        String[] splits = ratioFiles[i].getName().split("-");
-        long times = Long.parseLong(splits[2]);
-        if (times > maxTimes) {
-          maxTimes = times;
-          maxCompressionRatioSum = Double.parseDouble(splits[1]);
+        String[] fileNameArray = ratioFiles[i].getName().split("-");
+        long diskSize = Long.parseLong(fileNameArray[2]);
+        if (diskSize > totalDiskSize) {
+          totalMemorySize = new AtomicLong(Long.parseLong(fileNameArray[1]));
+          totalDiskSize = diskSize;
           maxRatioIndex = i;
         }
       }
-      calcTimes = maxTimes;
-      compressionRatioSum = maxCompressionRatioSum;
-      if (calcTimes != 0) {
-        compressionRatio.set(compressionRatioSum / calcTimes);
-      }
       LOGGER.debug(
-          "After restoring from compression ratio file, compressionRatioSum = 
{}, calcTimes = {}",
-          compressionRatioSum,
-          calcTimes);
-      for (int i = 0; i < ratioFiles.length; i++) {
-        if (i != maxRatioIndex) {
-          Files.delete(ratioFiles[i].toPath());
-          ratioFiles[i].delete();
+          "After restoring from compression ratio file, total memory size = 
{}, total disk size = {}",
+          totalMemorySize,
+          totalDiskSize);
+      deleteRedundantFilesByIndex(ratioFiles, maxRatioIndex);
+    } else { // If there is no new file, try to restore from the old version 
file
+      File[] ratioFilesBeforeV121 =
+          directory.listFiles((dir, name) -> 
name.startsWith(FILE_PREFIX_BEFORE_V121));
+      if (ratioFilesBeforeV121 != null && ratioFilesBeforeV121.length > 0) {
+        int maxRatioIndex = 0;
+        totalDiskSize = 1;
+        for (int i = 0; i < ratioFilesBeforeV121.length; i++) {
+          String[] fileNameArray = 
ratioFilesBeforeV121[i].getName().split("-");
+          double currentCompressRatio =
+              Double.parseDouble(fileNameArray[1]) / 
Double.parseDouble(fileNameArray[2]);
+          if (getRatio() < currentCompressRatio) {
+            totalMemorySize = new AtomicLong((long) currentCompressRatio);
+            maxRatioIndex = i;
+          }
         }
+        deleteRedundantFilesByIndex(ratioFilesBeforeV121, maxRatioIndex);
       }
     }
   }
 
-  /** Only for test */
-  void reset() {
-    calcTimes = 0;
-    compressionRatioSum = 0;
+  public static void deleteRedundantFilesByIndex(File[] files, int index) 
throws IOException {
+    for (int i = 0; i < files.length; i++) {
+      if (i != index) {
+        Files.delete(files[i].toPath());
+      }
+    }
   }
 
-  public double getCompressionRatioSum() {
-    return compressionRatioSum;
+  @TestOnly
+  void reset() throws IOException {
+    if (!directory.exists()) {
+      return;
+    }
+    File[] ratioFiles = directory.listFiles((dir, name) -> 
name.startsWith(FILE_PREFIX));
+    if (ratioFiles == null) {
+      return;
+    }
+    for (File file : ratioFiles) {
+      Files.delete(file.toPath());
+    }
+    totalMemorySize = new AtomicLong(0);
+    totalDiskSize = 0L;
   }
 
-  long getCalcTimes() {
-    return calcTimes;
+  public static void decreaseDuplicatedMemorySize(long size) {
+    totalMemorySize.addAndGet(-size);
   }
 
   public static CompressionRatio getInstance() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index aa4b3c4d6ed..a80a40e1995 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -19,8 +19,12 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -56,6 +60,8 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
 
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
 
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
   public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
     this.measurementIndexMap = new LinkedHashMap<>();
     List<TSDataType> dataTypeList = new ArrayList<>();
@@ -362,6 +368,7 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
         for (int sortedRowIndex = pageRange.get(pageNum * 2);
             sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
             sortedRowIndex++) {
+          TSDataType tsDataType = dataTypes.get(columnIndex);
 
           // skip time duplicated rows
           long time = list.getTime(sortedRowIndex);
@@ -371,6 +378,16 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
               lastValidPointIndexForTimeDupCheck.right = 
list.getValueIndex(sortedRowIndex);
             }
             if (timeDuplicateInfo[sortedRowIndex]) {
+              if (!list.isNullValue(sortedRowIndex, columnIndex)) {
+                long recordSize =
+                    MemUtils.getRecordSize(
+                        tsDataType,
+                        tsDataType == TSDataType.TEXT
+                            ? list.getBinaryByValueIndex(sortedRowIndex, 
columnIndex)
+                            : null,
+                        CONFIG.isEnableMemControl());
+                CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
+              }
               continue;
             }
           }
@@ -393,7 +410,7 @@ public class AlignedWritableMemChunk implements 
IWritableMemChunk {
           }
 
           boolean isNull = list.isNullValue(originRowIndex, columnIndex);
-          switch (dataTypes.get(columnIndex)) {
+          switch (tsDataType) {
             case BOOLEAN:
               alignedChunkWriter.writeByColumn(
                   time, list.getBooleanByValueIndex(originRowIndex, 
columnIndex), isNull);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index a1517fadace..6f0e6966b06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -1335,7 +1335,7 @@ public class TsFileProcessor {
       String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionId();
       WritingMetrics.getInstance()
           .recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId, 
compressionRatio);
-      CompressionRatio.getInstance().updateRatio(compressionRatio);
+      CompressionRatio.getInstance().updateRatio(totalMemTableSize, 
writer.getPos());
     } catch (IOException e) {
       logger.error(
           "{}: {} update compression ratio failed",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 21ba12505b7..8d062396f49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -18,7 +18,11 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -44,6 +48,8 @@ public class WritableMemChunk implements IWritableMemChunk {
   private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WritableMemChunk.class);
 
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+
   public WritableMemChunk(IMeasurementSchema schema) {
     this.schema = schema;
     this.list = TVList.newList(schema.getType());
@@ -323,8 +329,16 @@ public class WritableMemChunk implements IWritableMemChunk 
{
     for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); 
sortedRowIndex++) {
       long time = list.getTime(sortedRowIndex);
 
+      TSDataType tsDataType = schema.getType();
+
       // skip duplicated data
       if ((sortedRowIndex + 1 < list.rowCount() && (time == 
list.getTime(sortedRowIndex + 1)))) {
+        long recordSize =
+            MemUtils.getRecordSize(
+                tsDataType,
+                tsDataType == TSDataType.TEXT ? list.getBinary(sortedRowIndex) 
: null,
+                CONFIG.isEnableMemControl());
+        CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
         continue;
       }
 
@@ -333,7 +347,7 @@ public class WritableMemChunk implements IWritableMemChunk {
         chunkWriterImpl.setLastPoint(true);
       }
 
-      switch (schema.getType()) {
+      switch (tsDataType) {
         case BOOLEAN:
           chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex));
           break;
@@ -353,7 +367,7 @@ public class WritableMemChunk implements IWritableMemChunk {
           chunkWriterImpl.write(time, list.getBinary(sortedRowIndex));
           break;
         default:
-          LOGGER.error("WritableMemChunk does not support data type: {}", 
schema.getType());
+          LOGGER.error("WritableMemChunk does not support data type: {}", 
tsDataType);
           break;
       }
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
index bfe01387c01..ccbca71a323 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatioTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Locale;
 
 import static org.junit.Assert.assertEquals;
@@ -49,7 +50,6 @@ public class CompressionRatioTest {
     EnvironmentUtils.envSetUp();
     FileUtils.forceMkdir(new File(directory));
     compressionRatio.reset();
-    compressionRatio.restore();
   }
 
   @After
@@ -59,93 +59,49 @@ public class CompressionRatioTest {
 
   @Test
   public void testCompressionRatio() throws IOException {
-    double compressionRatioSum = 0;
-    int calcuTimes = 0;
-    if (new File(
-            directory,
-            String.format(
-                Locale.ENGLISH,
-                CompressionRatio.RATIO_FILE_PATH_FORMAT,
-                compressionRatioSum,
-                calcuTimes))
-        .exists()) {
-      fail();
-    }
-    double compressionRatio = 10;
-    for (int i = 0; i < 500; i += compressionRatio) {
-      this.compressionRatio.updateRatio(compressionRatio);
-      if (new File(
-              directory,
-              String.format(
-                  Locale.ENGLISH,
-                  CompressionRatio.RATIO_FILE_PATH_FORMAT,
-                  compressionRatioSum,
-                  calcuTimes))
-          .exists()) {
-        fail();
-      }
-      calcuTimes++;
-      compressionRatioSum += compressionRatio;
+    long totalMemorySize = 10;
+    long totalDiskSize = 5;
+
+    for (int i = 0; i < 5; i++) {
+      this.compressionRatio.updateRatio(10, 5);
       if (!new File(
               directory,
               String.format(
                   Locale.ENGLISH,
                   CompressionRatio.RATIO_FILE_PATH_FORMAT,
-                  compressionRatioSum,
-                  calcuTimes))
+                  totalMemorySize,
+                  totalDiskSize))
           .exists()) {
         fail();
       }
-      assertEquals(
-          0, Double.compare(compressionRatioSum / calcuTimes, 
this.compressionRatio.getRatio()));
+      assertEquals(2, this.compressionRatio.getRatio(), 0.1);
+      totalMemorySize += 10;
+      totalDiskSize += 5;
     }
   }
 
   @Test
   public void testRestore() throws IOException {
-    double compressionRatioSum = 0;
-    int calcuTimes = 0;
-    if (new File(
-            directory,
-            String.format(
-                Locale.ENGLISH,
-                CompressionRatio.RATIO_FILE_PATH_FORMAT,
-                compressionRatioSum,
-                calcuTimes))
-        .exists()) {
-      fail();
-    }
-    int compressionRatio = 10;
-    for (int i = 0; i < 100; i += compressionRatio) {
-      this.compressionRatio.updateRatio(compressionRatio);
-      if (new File(
-              directory,
-              String.format(
-                  Locale.ENGLISH,
-                  CompressionRatio.RATIO_FILE_PATH_FORMAT,
-                  compressionRatioSum,
-                  calcuTimes))
-          .exists()) {
-        fail();
-      }
-      calcuTimes++;
-      compressionRatioSum += compressionRatio;
-      if (!new File(
-              directory,
-              String.format(
-                  Locale.ENGLISH,
-                  CompressionRatio.RATIO_FILE_PATH_FORMAT,
-                  compressionRatioSum,
-                  calcuTimes))
-          .exists()) {
-        fail();
-      }
-      assertEquals(
-          0, Double.compare(compressionRatioSum / calcuTimes, 
this.compressionRatio.getRatio()));
-    }
-    this.compressionRatio.restore();
-    assertEquals(10, this.compressionRatio.getCalcTimes());
-    assertEquals(
-        0, Double.compare(compressionRatioSum / calcuTimes, 
this.compressionRatio.getRatio()));
+    Files.createFile(
+        new File(
+                directory,
+                String.format(Locale.ENGLISH, 
CompressionRatio.RATIO_FILE_PATH_FORMAT, 1000, 100))
+            .toPath());
+    Files.createFile(
+        new File(
+                directory,
+                String.format(Locale.ENGLISH, 
CompressionRatio.RATIO_FILE_PATH_FORMAT, 1000, 200))
+            .toPath());
+    Files.createFile(
+        new File(
+                directory,
+                String.format(Locale.ENGLISH, 
CompressionRatio.RATIO_FILE_PATH_FORMAT, 1000, 500))
+            .toPath());
+
+    compressionRatio.restore();
+
+    // if multiple files exist in the system due to some exceptions, restore 
the file with the
+    // largest diskSize to the memory
+    assertEquals(2, compressionRatio.getRatio(), 0.1);
   }
 }

Reply via email to