This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aaf5139029e refine tsfileWriter reset logic (#16309)
aaf5139029e is described below
commit aaf5139029e7533191701d370338783461afac1d
Author: Peng Junzhi <[email protected]>
AuthorDate: Sat Aug 30 21:11:23 2025 -0500
refine tsfileWriter reset logic (#16309)
Co-authored-by: 彭俊植 <[email protected]>
---
.../pipeconsensus/PipeConsensusReceiver.java | 304 ++++++++-------------
1 file changed, 116 insertions(+), 188 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 9549eba9adb..2b116d2e4d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -448,17 +448,8 @@ public class PipeConsensusReceiver {
writingFileWriter.getFD().sync();
// 1. The writing file writer must be closed, otherwise it may cause
concurrent errors during
// the process of loading tsfile when parsing tsfile.
- //
- // 2. The writing file must be set to null, otherwise if the next passed
tsfile has the same
- // name as the current tsfile, it will bypass the judgment logic of
- // updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue
to write to the already
- // loaded file. Since the writing file writer has already been closed,
it will throw a Stream
- // Close exception.
+ // 2. writingFileWriter and writingFile will be reset in
`releaseTsFileWriter`
writingFileWriter.close();
- tsFileWriter.setWritingFileWriter(null);
-
- // writingFile will be deleted after load if no exception occurs
- tsFileWriter.setWritingFile(null);
long endPreCheckNanos = System.nanoTime();
pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(
@@ -567,17 +558,8 @@ public class PipeConsensusReceiver {
writingFileWriter.getFD().sync();
// 1. The writing file writer must be closed, otherwise it may cause
concurrent errors during
// the process of loading tsfile when parsing tsfile.
- //
- // 2. The writing file must be set to null, otherwise if the next passed
tsfile has the same
- // name as the current tsfile, it will bypass the judgment logic of
- // updateWritingFileIfNeeded#isFileExistedAndNameCorrect, and continue
to write to the already
- // loaded file. Since the writing file writer has already been closed,
it will throw a Stream
- // Close exception.
+ // 2. writingFileWriter and writingFile will be reset in
`releaseTsFileWriter`
writingFileWriter.close();
- tsFileWriter.setWritingFileWriter(null);
-
- // WritingFile will be deleted after load if no exception occurs
- tsFileWriter.setWritingFile(null);
final List<String> fileAbsolutePaths =
files.stream().map(File::getAbsolutePath).collect(Collectors.toList());
@@ -628,24 +610,6 @@ public class PipeConsensusReceiver {
}
}
- private void releaseTsFileWriter(
- PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
- if (tsFileWriter == null) {
- return;
- }
- closeCurrentWritingFileWriter(tsFileWriter, fsyncBeforeClose);
- deleteCurrentWritingFile(tsFileWriter);
- try {
- tsFileWriter.returnSelf(consensusPipeName);
- } catch (IOException | DiskSpaceInsufficientException e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
- consensusPipeName,
- tsFileWriter,
- e);
- }
- }
-
private TPipeConsensusTransferResp checkNonFinalFileSeal(
final PipeConsensusTsFileWriter tsFileWriter,
final File file,
@@ -846,100 +810,13 @@ public class PipeConsensusReceiver {
return !offsetCorrect;
}
- private void closeCurrentWritingFileWriter(
- PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
- if (tsFileWriter.getWritingFileWriter() != null) {
- try {
- if (fsyncBeforeClose) {
- tsFileWriter.getWritingFileWriter().getFD().sync();
- }
- tsFileWriter.getWritingFileWriter().close();
- LOGGER.info(
- "PipeConsensus-PipeName-{}: Current writing file writer {} was
closed.",
- consensusPipeName,
- tsFileWriter.getWritingFile() == null
- ? "null"
- : tsFileWriter.getWritingFile().getPath());
- tsFileWriter.setWritingFileWriter(null);
- } catch (IOException e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to close current writing file
writer {}, because {}.",
- consensusPipeName,
- tsFileWriter.getWritingFile() == null
- ? "null"
- : tsFileWriter.getWritingFile().getPath(),
- e.getMessage(),
- e);
- }
- } else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "PipeConsensus-PipeName-{}: Current writing file writer is null.
No need to close.",
- consensusPipeName.toString());
- }
- }
- }
-
- private void deleteFileOrDirectoryIfExists(File file, String reason) {
- if (file.exists()) {
- try {
- if (file.isDirectory()) {
- RetryUtils.retryOnException(
- () -> {
- FileUtils.deleteDirectory(file);
- return null;
- });
- } else {
- RetryUtils.retryOnException(() -> FileUtils.delete(file));
- }
- LOGGER.info(
- "PipeConsensus-PipeName-{}: {} {} was deleted.",
- consensusPipeName,
- reason,
- file.getPath());
- } catch (IOException e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: {} Failed to delete {}, because {}.",
- consensusPipeName,
- reason,
- file.getPath(),
- e.getMessage(),
- e);
- }
- } else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "PipeConsensus-PipeName-{}: {} {} is not existed. No need to
delete.",
- consensusPipeName,
- reason,
- file.getPath());
- }
- }
- }
-
- private void deleteCurrentWritingFile(PipeConsensusTsFileWriter
tsFileWriter) {
- if (tsFileWriter.getLocalWritingDir() != null) {
- try {
- // There may be multiple files such as mods and tsfile pieces in the
dir. Here we clean the
- // dir instead of deleting it to avoid repeatedly deleting and
creating the base dir for
- // tsfile writer
- FileUtils.cleanDirectory(tsFileWriter.getLocalWritingDir());
- } catch (IOException e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to clean current writing file
dir {}.",
- consensusPipeName,
- tsFileWriter.getLocalWritingDir().getPath(),
- e);
- }
- }
- }
-
private void updateWritingFileIfNeeded(
final PipeConsensusTsFileWriter tsFileWriter,
final String fileName,
final boolean isSingleFile)
throws IOException {
- if (isFileExistedAndNameCorrect(tsFileWriter, fileName)) {
+ if (isFileExistedAndNameCorrect(tsFileWriter, fileName)
+ && tsFileWriter.getWritingFileWriter() != null) {
return;
}
@@ -954,7 +831,10 @@ public class PipeConsensusReceiver {
// If there are multiple files we can not delete the current file
// instead they will be deleted after seal request
if (tsFileWriter.getWritingFile() != null && isSingleFile) {
- deleteCurrentWritingFile(tsFileWriter);
+ deleteFileOrDirectoryIfExists(
+ tsFileWriter.getWritingFile(),
+ false,
+ String.format("Update TsFileWriter-%s", tsFileWriter.index));
}
// Make sure receiver file dir exists
@@ -1002,7 +882,8 @@ public class PipeConsensusReceiver {
consensusPipeName, newReceiverDir.getPath()));
}
// Remove exists dir
- deleteFileOrDirectoryIfExists(newReceiverDir, "Initial Receiver: delete
origin receive dir");
+ deleteFileOrDirectoryIfExists(
+ newReceiverDir, true, "Initial Receiver: delete origin receive dir");
if (!newReceiverDir.mkdirs()) {
LOGGER.warn(
@@ -1022,7 +903,7 @@ public class PipeConsensusReceiver {
// Clear the original receiver file dir if exists
for (String receiverFileBaseDir : receiveDirs) {
File receiverDir = new File(receiverFileBaseDir);
- deleteFileOrDirectoryIfExists(receiverDir, "Clear receive dir manually");
+ deleteFileOrDirectoryIfExists(receiverDir, true, "Clear receive dir
manually");
}
}
@@ -1146,20 +1027,11 @@ public class PipeConsensusReceiver {
writer -> {
if (System.currentTimeMillis() - writer.lastUsedTs
>= IOTDB_CONFIG.getTsFileWriterZombieThreshold()) {
- try {
- writer.closeSelf(consensusPipeName);
- writer.returnSelf(consensusPipeName);
- LOGGER.info(
- "PipeConsensus-PipeName-{}: tsfile writer-{} is
cleaned up because no new requests were received for too long.",
- consensusPipeName,
- writer.index);
- } catch (IOException | DiskSpaceInsufficientException e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: receiver watch dog failed
to return tsFileWriter-{}.",
- consensusPipeName.toString(),
- writer.index,
- e);
- }
+ releaseTsFileWriter(writer, false);
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: tsfile writer-{} is cleaned
up because no new requests were received for too long.",
+ consensusPipeName,
+ writer.index);
}
});
}
@@ -1183,17 +1055,7 @@ public class PipeConsensusReceiver {
break;
}
}
-
- try {
- tsFileWriter.closeSelf(consensusPipeName);
- tsFileWriter.returnSelf(consensusPipeName);
- } catch (IOException | DiskSpaceInsufficientException e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: receiver thread failed to return
tsFileWriter-{} when exiting.",
- consensusPipeName.toString(),
- tsFileWriter.index,
- e);
- }
+ releaseTsFileWriter(tsFileWriter, false);
});
}
}
@@ -1234,6 +1096,7 @@ public class PipeConsensusReceiver {
File writingDir = new File(receiverBasePath + File.separator +
index);
deleteFileOrDirectoryIfExists(
writingDir,
+ true,
String.format(
"TsFileWriter-%s roll to new dir and delete last
writing dir", index));
@@ -1339,45 +1202,110 @@ public class PipeConsensusReceiver {
consensusPipeName.toString(),
index);
}
+ }
- public void closeSelf(ConsensusPipeName consensusPipeName) {
- // close file writer
- if (writingFileWriter != null) {
- try {
- writingFileWriter.close();
- LOGGER.info(
- "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file
writer was closed.",
- consensusPipeName.toString(),
- index);
- setWritingFileWriter(null);
- } catch (Exception e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Close Writing
file writer error.",
- consensusPipeName,
- index,
- e);
- }
- } else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file
writer is null. No need to close.",
- consensusPipeName.toString(),
- index);
+ private void closeCurrentWritingFileWriter(
+ PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
+ if (tsFileWriter.getWritingFileWriter() != null) {
+ try {
+ if (fsyncBeforeClose) {
+ tsFileWriter.getWritingFileWriter().getFD().sync();
}
+ tsFileWriter.getWritingFileWriter().close();
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: Current writing file writer {} was
closed.",
+ consensusPipeName,
+ tsFileWriter.getWritingFile() == null
+ ? "null"
+ : tsFileWriter.getWritingFile().getPath());
+ tsFileWriter.setWritingFileWriter(null);
+ } catch (IOException e) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to close current writing file
writer {}, because {}.",
+ consensusPipeName,
+ tsFileWriter.getWritingFile() == null
+ ? "null"
+ : tsFileWriter.getWritingFile().getPath(),
+ e.getMessage(),
+ e);
}
+ } else {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "PipeConsensus-PipeName-{}: Current writing file writer is null.
No need to close.",
+ consensusPipeName.toString());
+ }
+ }
+ }
- // close file
- if (writingFile != null) {
- deleteFileOrDirectoryIfExists(
- writingFile, String.format("TsFileWriter-%s exit: delete writing
file", this.index));
- setWritingFile(null);
- } else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file is
null. No need to delete.",
- consensusPipeName.toString());
+ private void deleteFileOrDirectoryIfExists(File file, boolean deleteDir,
String reason) {
+ if (file.exists()) {
+ try {
+ if (file.isDirectory()) {
+ if (deleteDir) {
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(file);
+ return null;
+ });
+ } else {
+ // There may be multiple files such as mods and tsfile pieces in
the dir. Here we clean
+ // the
+ // dir instead of deleting it to avoid repeatedly deleting and
creating the base dir for
+ // tsfile writer
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.cleanDirectory(file);
+ return null;
+ });
+ }
+ } else {
+ RetryUtils.retryOnException(() -> FileUtils.delete(file));
}
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: {} {} was deleted.",
+ consensusPipeName,
+ reason,
+ file.getPath());
+ } catch (IOException e) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: {} Failed to delete {}, because {}.",
+ consensusPipeName,
+ reason,
+ file.getPath(),
+ e.getMessage(),
+ e);
}
+ } else {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "PipeConsensus-PipeName-{}: {} {} is not existed. No need to
delete.",
+ consensusPipeName,
+ reason,
+ file.getPath());
+ }
+ }
+ }
+
+ private void releaseTsFileWriter(
+ PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
+ if (tsFileWriter == null) {
+ return;
+ }
+ closeCurrentWritingFileWriter(tsFileWriter, fsyncBeforeClose);
+ deleteFileOrDirectoryIfExists(
+ tsFileWriter.getLocalWritingDir(),
+ false,
+ String.format("Release TsFileWriter-%s", tsFileWriter.index));
+ tsFileWriter.setWritingFile(null);
+ try {
+ tsFileWriter.returnSelf(consensusPipeName);
+ } catch (IOException | DiskSpaceInsufficientException e) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
+ consensusPipeName,
+ tsFileWriter,
+ e);
}
}