This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 13f84fdf3ec Fix empty tsfile and resource generated when insert, load,
kill -9 and restart (#16215)
13f84fdf3ec is described below
commit 13f84fdf3ecb4122ece9ec5bc995fba86019d615
Author: Haonan <[email protected]>
AuthorDate: Thu Aug 21 09:52:37 2025 +0800
Fix empty tsfile and resource generated when insert, load, kill -9 and
restart (#16215)
* Fix empty tsfile and resource generated when insert, load, kill -9 and
restart
* Fix more
* Add IT
---
.../org/apache/iotdb/db/it/IoTDBRestartIT.java | 59 ++++++++++++++++++++++
.../db/storageengine/dataregion/DataRegion.java | 37 +++++++-------
2 files changed, 79 insertions(+), 17 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
index 8e2728be485..df4692f3501 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
@@ -24,9 +24,13 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.it.utils.TsFileGenerator;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -35,10 +39,13 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.nio.file.Files;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
import static org.junit.Assert.assertEquals;
@@ -371,4 +378,56 @@ public class IoTDBRestartIT {
}
}
}
+
+ @Test
+ public void testInsertLoadAndRecover() throws Exception {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("create timeseries root.sg.d1.s1 with datatype=int32");
+ statement.execute("insert into root.sg.d1(time,s1) values(2,2)");
+ statement.execute("flush");
+ }
+ File tmpDir = new File(Files.createTempDirectory("load").toUri());
+ File tsfile = new File(tmpDir, "0-0-0-0.tsfile");
+ try {
+ try (final TsFileGenerator generator = new TsFileGenerator(tsfile)) {
+ generator.registerTimeseries(
+ "root.sg.d1", Collections.singletonList(new
MeasurementSchema("s1", TSDataType.INT32)));
+ generator.generateData("root.sg.d1", 1, 2, false);
+ }
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("insert into root.sg.d1(time,s1) values(1,1)");
+ statement.execute(String.format("load \"%s\" ",
tsfile.getAbsolutePath()));
+ try (ResultSet resultSet = statement.executeQuery("select s1 from
root.sg.d1")) {
+ assertNotNull(resultSet);
+ int cnt = 0;
+ while (resultSet.next()) {
+ assertEquals(String.valueOf(cnt + 1), resultSet.getString(1));
+ cnt++;
+ }
+ assertEquals(2, cnt);
+ }
+ }
+
+ // restart dn
+ TestUtils.stopForciblyAndRestartDataNodes();
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ try (ResultSet resultSet = statement.executeQuery("select s1 from
root.sg.d1")) {
+ assertNotNull(resultSet);
+ int cnt = 0;
+ while (resultSet.next()) {
+ assertEquals(String.valueOf(cnt + 1), resultSet.getString(1));
+ cnt++;
+ }
+ assertEquals(2, cnt);
+ }
+ }
+
+ } finally {
+ FileUtils.deleteDirectory(tmpDir);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 329728ab130..04f959be5b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -581,9 +581,10 @@ public class DataRegion implements IDataRegionForQuery {
}
}
}
- for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
+ for (List<TsFileResource> unseqTsFiles :
partitionTmpUnseqTsFiles.values()) {
+ List<TsFileResource> unsealedTsFiles = new ArrayList<>();
// tsFiles without resource file are unsealed
- for (TsFileResource resource : value) {
+ for (TsFileResource resource : unseqTsFiles) {
if (resource.resourceFileExists()) {
FileMetrics.getInstance()
.addTsFile(
@@ -592,6 +593,13 @@ public class DataRegion implements IDataRegionForQuery {
resource.getTsFile().length(),
false,
resource.getTsFile().getName());
+ } else {
+ WALRecoverListener recoverListener =
+ recoverUnsealedTsFile(resource, dataRegionRecoveryContext,
false);
+ if (recoverListener != null) {
+ recoverListeners.add(recoverListener);
+ }
+ unsealedTsFiles.add(resource);
}
if
(ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
// update mods file metrics
@@ -600,19 +608,7 @@ public class DataRegion implements IDataRegionForQuery {
resource.upgradeModFile(upgradeModFileThreadPool);
}
}
- while (!value.isEmpty()) {
- TsFileResource tsFileResource = value.get(value.size() - 1);
- if (tsFileResource.resourceFileExists()) {
- break;
- } else {
- value.remove(value.size() - 1);
- WALRecoverListener recoverListener =
- recoverUnsealedTsFile(tsFileResource,
dataRegionRecoveryContext, false);
- if (recoverListener != null) {
- recoverListeners.add(recoverListener);
- }
- }
- }
+ unseqTsFiles.removeAll(unsealedTsFiles);
}
// signal wal recover manager to recover this region's files
WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
@@ -964,7 +960,12 @@ public class DataRegion implements IDataRegionForQuery {
new SealedTsFileRecoverPerformer(sealedTsFile)) {
recoverPerformer.recover();
sealedTsFile.close();
- tsFileResourceManager.registerSealedTsFileResource(sealedTsFile);
+ if (!TsFileValidator.getInstance().validateTsFile(sealedTsFile)) {
+ sealedTsFile.remove();
+ tsFileManager.remove(sealedTsFile, sealedTsFile.isSeq());
+ } else {
+ tsFileResourceManager.registerSealedTsFileResource(sealedTsFile);
+ }
} catch (Throwable e) {
logger.error("Fail to recover sealed TsFile {}, skip it.",
sealedTsFile.getTsFilePath(), e);
} finally {
@@ -1075,7 +1076,9 @@ public class DataRegion implements IDataRegionForQuery {
lastFlushTimeMap.getMemSize(partitionId)));
}
for (TsFileResource tsFileResource : resourceList) {
- updateDeviceLastFlushTime(tsFileResource);
+ if (!tsFileResource.isDeleted()) {
+ updateDeviceLastFlushTime(tsFileResource);
+ }
}
TimePartitionManager.getInstance()
.updateAfterFlushing(