This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 23105765868 Fix deleting wrong compression ratio file after flush &
delete compression ratio file after region removal (#16335)
23105765868 is described below
commit 2310576586832ddc8088f50628d0775c0dc64206
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Sep 3 17:21:43 2025 +0800
Fix deleting wrong compression ratio file after flush & delete compression
ratio file after region removal (#16335)
---
.../iotdb/it/env/cluster/node/DataNodeWrapper.java | 10 +-
.../java/org/apache/iotdb/db/it/IoTDBMiscIT.java | 116 +++++++++++++++++++++
.../iotdb/db/storageengine/StorageEngine.java | 4 +
.../dataregion/flush/CompressionRatio.java | 26 ++++-
4 files changed, 152 insertions(+), 4 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index 1d65adacf7e..f08c085bc6c 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -157,8 +157,16 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
return getNodePath() + File.separator + "data";
}
+ public String getDataNodeDir() {
+ return getDataDir() + File.separator + "datanode";
+ }
+
public String getWalDir() {
- return getDataDir() + File.separator + "datanode" + File.separator + "wal";
+ return getDataNodeDir() + File.separator + "wal";
+ }
+
+ public String getSystemDir() {
+ return getDataNodeDir() + File.separator + "system";
}
@Override
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMiscIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMiscIT.java
new file mode 100644
index 00000000000..146a95fd024
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBMiscIT.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(IoTDBTestRunner.class)
+public class IoTDBMiscIT {
+
+ @Category({LocalStandaloneIT.class})
+ @Test
+ public void testCompressionRatioFile() throws SQLException {
+ SimpleEnv simpleEnv = new SimpleEnv();
+ simpleEnv.initClusterEnvironment(1, 1);
+
+ DataNodeWrapper nodeWrapper = simpleEnv.getDataNodeWrapper(0);
+ try (Connection connection = simpleEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("insert into
root.comprssion_ratio_file.d1(timestamp,s1) values(1,1.0)");
+ statement.execute("flush");
+ // one global file and two data region file (including one system region)
+ assertEquals(3, collectCompressionRatioFiles(nodeWrapper).size());
+
+ statement.execute("drop database root.comprssion_ratio_file");
+ // one global file and system region file
+ // deleting a file may not be sensed by other processes instantly
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .pollDelay(100, TimeUnit.MILLISECONDS)
+ .until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 2);
+
+ statement.execute("insert into
root.comprssion_ratio_file.d1(timestamp,s1) values(1,1.0)");
+ statement.execute("flush");
+ assertEquals(3, collectCompressionRatioFiles(nodeWrapper).size());
+
+ statement.execute("drop database root.comprssion_ratio_file");
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .pollDelay(100, TimeUnit.MILLISECONDS)
+ .until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 2);
+
+ statement.execute("insert into
root.comprssion_ratio_file.d1(timestamp,s1) values(1,1.0)");
+ statement.execute("flush");
+ assertEquals(3, collectCompressionRatioFiles(nodeWrapper).size());
+
+ statement.execute("insert into
root.comprssion_ratio_file_2.d1(timestamp,s1) values(1,1.0)");
+ statement.execute("flush");
+ assertEquals(4, collectCompressionRatioFiles(nodeWrapper).size());
+
+ statement.execute("drop database root.comprssion_ratio_file");
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .pollDelay(100, TimeUnit.MILLISECONDS)
+ .until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 3);
+
+ statement.execute("drop database root.comprssion_ratio_file_2");
+ Awaitility.await()
+ .atMost(10, TimeUnit.SECONDS)
+ .pollDelay(100, TimeUnit.MILLISECONDS)
+ .until(() -> collectCompressionRatioFiles(nodeWrapper).size() == 2);
+ } finally {
+ simpleEnv.cleanClusterEnvironment();
+ }
+ }
+
+ private List<File> collectCompressionRatioFiles(DataNodeWrapper nodeWrapper)
throws SQLException {
+ String compressionRatioDir =
+ nodeWrapper.getSystemDir() + File.separator +
CompressionRatio.COMPRESSION_RATIO_DIR;
+ File compressionRatioDirFile = new File(compressionRatioDir);
+ if (!compressionRatioDirFile.exists() ||
!compressionRatioDirFile.isDirectory()) {
+ return Collections.emptyList();
+ } else {
+ File[] files =
+ compressionRatioDirFile.listFiles(
+ f -> f.getName().startsWith(CompressionRatio.FILE_PREFIX));
+ return files != null ? Arrays.asList(files) : Collections.emptyList();
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 034d184bd03..0bc3f408bba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -67,6 +67,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairLogg
import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.UnsortedFileRepairTaskScheduler;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
+import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
import
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
@@ -770,6 +771,7 @@ public class StorageEngine implements IService {
DataRegion region =
deletingDataRegionMap.computeIfAbsent(regionId, k ->
dataRegionMap.remove(regionId));
if (region != null) {
+ LOGGER.info("Removing data region {}", regionId);
region.markDeleted();
try {
region.abortCompaction();
@@ -813,6 +815,8 @@ public class StorageEngine implements IService {
WRITING_METRICS.removeFlushingMemTableStatusMetrics(regionId);
WRITING_METRICS.removeActiveMemtableCounterMetrics(regionId);
FileMetrics.getInstance().deleteRegion(region.getDatabaseName(),
region.getDataRegionId());
+
CompressionRatio.getInstance().removeDataRegionRatio(String.valueOf(regionId.getId()));
+ LOGGER.info("Removed data region {}", regionId);
} catch (Exception e) {
LOGGER.error(
"Error occurs when deleting data region {}-{}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
index c22afac0817..724f4d0decf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java
@@ -54,10 +54,10 @@ public class CompressionRatio {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- static final String COMPRESSION_RATIO_DIR = "compression_ratio";
+ public static final String COMPRESSION_RATIO_DIR = "compression_ratio";
private static final String FILE_PREFIX_BEFORE_V121 = "Ratio-";
- private static final String FILE_PREFIX = "Compress-";
+ public static final String FILE_PREFIX = "Compress-";
private static final String SEPARATOR = "-";
@@ -109,6 +109,7 @@ public class CompressionRatio {
String.format(
Locale.ENGLISH, RATIO_FILE_PATH_FORMAT, totalMemorySize.get(),
totalDiskSize));
persist(oldDataNodeFile, newDataNodeFile);
+ this.oldFileName = newDataNodeFile.getName();
Pair<Long, Long> dataRegionCompressionRatio =
dataRegionRatioMap.computeIfAbsent(dataRegionId, id -> new Pair<>(0L,
0L));
@@ -137,6 +138,26 @@ public class CompressionRatio {
persist(oldDataRegionFile, newDataRegionFile);
}
+ public synchronized void removeDataRegionRatio(String dataRegionId) {
+ Pair<Long, Long> dataRegionCompressionRatio =
dataRegionRatioMap.remove(dataRegionId);
+ if (dataRegionCompressionRatio == null) {
+ return;
+ }
+ File oldDataRegionFile =
+ SystemFileFactory.INSTANCE.getFile(
+ directory,
+ String.format(
+ Locale.ENGLISH,
+ RATIO_FILE_PATH_FORMAT,
+ dataRegionCompressionRatio.getLeft(),
+ dataRegionCompressionRatio.getRight())
+ + "."
+ + dataRegionId);
+ if (!oldDataRegionFile.delete() && oldDataRegionFile.exists()) {
+ LOGGER.warn("Can't delete old data region compression file {}",
oldDataRegionFile);
+ }
+ }
+
/** Get the average compression ratio for all closed files */
public double getRatio() {
return (double) totalMemorySize.get() / totalDiskSize;
@@ -171,7 +192,6 @@ public class CompressionRatio {
oldFile.getAbsolutePath(),
newFile.getAbsolutePath());
}
- this.oldFileName = newFile.getName();
}
private void checkDirectoryExist() throws IOException {