This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c110ab10c03fd88f89f57d8daff081fd09f8c7b7 Author: Caideyipi <[email protected]> AuthorDate: Tue Nov 18 22:48:34 2025 +0800 Load: Fixed the bug that mods is not deleted in load tsFile when there are exceptions & Fixed the potential NPE in air gap agent close() method (#16775) (cherry picked from commit 2c381feda256c8441492203beaf88a0ff2f2b70c) --- .../receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java | 5 ++++- .../PipeTableStatementDataTypeConvertExecutionVisitor.java | 4 ++-- .../PipeTreeStatementDataTypeConvertExecutionVisitor.java | 4 ++-- .../iotdb/db/storageengine/load/LoadTsFileManager.java | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java index 644bda1835c..1aa828a4f8c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -84,7 +85,9 @@ public class IoTDBAirGapReceiverAgent implements IService { @Override public void stop() { try { - serverSocket.close(); + if (Objects.nonNull(serverSocket)) { + serverSocket.close(); + } } catch (final IOException e) { LOGGER.warn("Failed to close IoTDBAirGapReceiverAgent's server socket", e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java index 4bb1f8d2e17..d79b05f2faf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java @@ -37,10 +37,10 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDevice import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.external.commons.io.FileUtils; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,7 +193,7 @@ public class PipeTableStatementDataTypeConvertExecutionVisitor } if (loadTsFileStatement.isDeleteAfterLoad()) { - loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); + loadTsFileStatement.getTsFiles().forEach(LoadTsFileManager::cleanTsFile); } LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index e9707299e09..282b378a2d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java @@ -36,9 +36,9 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDevice import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.external.commons.io.FileUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; @@ -153,7 +153,7 @@ public class PipeTreeStatementDataTypeConvertExecutionVisitor } if (loadTsFileStatement.isDeleteAfterLoad()) { - loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); + loadTsFileStatement.getTsFiles().forEach(LoadTsFileManager::cleanTsFile); } LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index f6203be7be3..30bb64cbd31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -46,6 +46,7 @@ import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent; @@ -400,6 +401,19 @@ public class LoadTsFileManager { } } + public static void cleanTsFile(final File tsFile) { + try { + Files.deleteIfExists(tsFile.toPath()); + Files.deleteIfExists( + new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).toPath()); + Files.deleteIfExists(ModificationFile.getExclusiveMods(tsFile).toPath()); + Files.deleteIfExists( + new File(tsFile.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).toPath()); + } catch (final IOException e) { + LOGGER.warn("Delete After Loading {} error.", tsFile, e); + } + } + private static class TsFileWriterManager { private final File taskDir;
