This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b4852c91c52 IoTV2: Try to fix tsfile corruption & receiver dir clean
(#15410)
b4852c91c52 is described below
commit b4852c91c52ddad20815d5fb3ce027768b1b0d9b
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Apr 28 15:11:25 2025 +0800
IoTV2: Try to fix tsfile corruption & receiver dir clean (#15410)
* Try to fix tsfile corruption & receiver dir clean
* improve
* typo
---
.../pipe/consensuspipe/ConsensusPipeReceiver.java | 3 +-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 26 --
.../pipeconsensus/PipeConsensusReceiver.java | 327 ++++++++++-----------
.../pipeconsensus/PipeConsensusReceiverAgent.java | 61 ++--
.../iotdb/db/storageengine/StorageEngine.java | 1 +
5 files changed, 198 insertions(+), 220 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeReceiver.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeReceiver.java
index ba0c89fbc6f..d5be57682ca 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeReceiver.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeReceiver.java
@@ -19,11 +19,12 @@
package org.apache.iotdb.consensus.pipe.consensuspipe;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
public interface ConsensusPipeReceiver {
TPipeConsensusTransferResp receive(TPipeConsensusTransferReq req);
- void handleDropPipeConsensusTask(ConsensusPipeName pipeName);
+ void releaseReceiverResource(DataRegionId regionId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 96fdbe36209..df196eabaf8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -331,28 +331,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
- if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
- // Release corresponding receiver's resource
- PipeDataNodeAgent.receiver()
- .pipeConsensus()
- .handleDropPipeConsensusTask(new ConsensusPipeName(pipeName));
- }
-
return true;
}
- @Override
- protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws
IllegalPathException {
- String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName();
- if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
- // Release corresponding receiver's resource
- PipeDataNodeAgent.receiver()
- .pipeConsensus()
- .markConsensusPipeAsCreated(new ConsensusPipeName(pipeName));
- }
- return super.createPipe(pipeMetaFromCoordinator);
- }
-
@Override
protected boolean dropPipe(final String pipeName) {
// Get the pipe meta first because it is removed after
super#dropPipe(pipeName)
@@ -369,13 +350,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
}
- if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
- // Release corresponding receiver's resource
- PipeDataNodeAgent.receiver()
- .pipeConsensus()
- .handleDropPipeConsensusTask(new ConsensusPipeName(pipeName));
- }
-
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 5f353a61445..d01d7315b95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -91,7 +91,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class PipeConsensusReceiver {
@@ -117,6 +119,7 @@ public class PipeConsensusReceiver {
private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
private final FolderManager folderManager;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final ReadWriteLock tsFilePieceReadWriteLock = new
ReentrantReadWriteLock(true);
public PipeConsensusReceiver(
PipeConsensus pipeConsensus,
@@ -241,6 +244,9 @@ public class PipeConsensusReceiver {
}
private TPipeConsensusTransferResp loadEvent(final TPipeConsensusTransferReq
req) {
+ if (isClosed.get()) {
+ return
PipeConsensusReceiverAgent.closedResp(consensusPipeName.toString(),
req.getCommitId());
+ }
// synchronized load event, ensured by upper caller's lock.
try {
final short rawRequestType = req.getType();
@@ -329,91 +335,97 @@ public class PipeConsensusReceiver {
private TPipeConsensusTransferResp handleTransferFilePiece(
final PipeConsensusTransferFilePieceReq req, final boolean isSingleFile)
{
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "PipeConsensus-PipeName-{}: starting to receive tsFile pieces",
consensusPipeName);
- }
- long startBorrowTsFileWriterNanos = System.nanoTime();
- PipeConsensusTsFileWriter tsFileWriter =
-
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
- long startPreCheckNanos = System.nanoTime();
- pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
- startPreCheckNanos - startBorrowTsFileWriterNanos);
-
+ tsFilePieceReadWriteLock.readLock().lock();
try {
- updateWritingFileIfNeeded(tsFileWriter, req.getFileName(), isSingleFile);
- final File writingFile = tsFileWriter.getWritingFile();
- final RandomAccessFile writingFileWriter =
tsFileWriter.getWritingFileWriter();
-
- if (isWritingFileOffsetNonCorrect(tsFileWriter,
req.getStartWritingOffset())) {
- if (!writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
- // If the file is a tsFile, then the content will not be changed for
a specific
- // filename. However, for other files (mod, snapshot, etc.) the
content varies for the
- // same name in different times, then we must rewrite the file to
apply the newest
- // version.
- writingFileWriter.setLength(0);
- }
-
- final TSStatus status =
- RpcUtils.getStatus(
- TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET,
- String.format(
- "Request sender to reset file reader's offset from %s to
%s.",
- req.getStartWritingOffset(), writingFileWriter.length()));
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: File offset reset requested by
receiver, response status = {}.",
- consensusPipeName,
- status);
- return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
- status, writingFileWriter.length());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "PipeConsensus-PipeName-{}: starting to receive tsFile pieces",
consensusPipeName);
}
+ long startBorrowTsFileWriterNanos = System.nanoTime();
+ PipeConsensusTsFileWriter tsFileWriter =
+
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
+ long startPreCheckNanos = System.nanoTime();
+ pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
+ startPreCheckNanos - startBorrowTsFileWriterNanos);
- long endPreCheckNanos = System.nanoTime();
- pipeConsensusReceiverMetrics.recordTsFilePiecePreCheckTime(
- endPreCheckNanos - startPreCheckNanos);
- writingFileWriter.write(req.getFilePiece());
-
pipeConsensusReceiverMetrics.recordTsFilePieceWriteTime(System.nanoTime() -
endPreCheckNanos);
- return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
- RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
- } catch (Exception e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to write file piece from req {}.",
- consensusPipeName,
- req,
- e);
- final TSStatus status =
- RpcUtils.getStatus(
- TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR,
- String.format("Failed to write file piece, because %s",
e.getMessage()));
try {
- return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
- status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
- } catch (IOException ex) {
- return
PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(status);
- } finally {
- // Exception may occur when disk system go wrong. At this time, we may
reset all resource
- // and receive this file from scratch when leader will try to resend
this file from scratch
- // as well.
- closeCurrentWritingFileWriter(tsFileWriter, false);
- deleteCurrentWritingFile(tsFileWriter);
- // must return tsfileWriter after deleting its file.
- try {
- tsFileWriter.returnSelf(consensusPipeName);
- } catch (IOException | DiskSpaceInsufficientException returnException)
{
+ updateWritingFileIfNeeded(tsFileWriter, req.getFileName(),
isSingleFile);
+ final File writingFile = tsFileWriter.getWritingFile();
+ final RandomAccessFile writingFileWriter =
tsFileWriter.getWritingFileWriter();
+
+ if (isWritingFileOffsetNonCorrect(tsFileWriter,
req.getStartWritingOffset())) {
+ if (!writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ // If the file is a tsFile, then the content will not be changed
for a specific
+ // filename. However, for other files (mod, snapshot, etc.) the
content varies for the
+ // same name in different times, then we must rewrite the file to
apply the newest
+ // version.
+ writingFileWriter.setLength(0);
+ }
+
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET,
+ String.format(
+ "Request sender to reset file reader's offset from %s to
%s.",
+ req.getStartWritingOffset(),
writingFileWriter.length()));
LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
+ "PipeConsensus-PipeName-{}: File offset reset requested by
receiver, response status = {}.",
consensusPipeName,
- tsFileWriter,
- returnException);
+ status);
+ return
PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
+ status, writingFileWriter.length());
+ }
+
+ long endPreCheckNanos = System.nanoTime();
+ pipeConsensusReceiverMetrics.recordTsFilePiecePreCheckTime(
+ endPreCheckNanos - startPreCheckNanos);
+ writingFileWriter.write(req.getFilePiece());
+ pipeConsensusReceiverMetrics.recordTsFilePieceWriteTime(
+ System.nanoTime() - endPreCheckNanos);
+ return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
+ RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
+ } catch (Exception e) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to write file piece from req
{}.",
+ consensusPipeName,
+ req,
+ e);
+ final TSStatus status =
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR,
+ String.format("Failed to write file piece, because %s",
e.getMessage()));
+ try {
+ return
PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
+ status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
+ } catch (IOException ex) {
+ return
PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(status);
+ } finally {
+ // Exception may occur when disk system go wrong. At this time, we
may reset all resource
+ // and receive this file from scratch when leader will try to resend
this file from
+ // scratch
+ // as well.
+ closeCurrentWritingFileWriter(tsFileWriter, false);
+ deleteCurrentWritingFile(tsFileWriter);
+ // must return tsfileWriter after deleting its file.
+ try {
+ tsFileWriter.returnSelf(consensusPipeName);
+ } catch (IOException | DiskSpaceInsufficientException
returnException) {
+ LOGGER.warn(
+ "PipeConsensus-PipeName-{}: Failed to return tsFileWriter {}.",
+ consensusPipeName,
+ tsFileWriter,
+ returnException);
+ }
}
}
+ } finally {
+ tsFilePieceReadWriteLock.readLock().unlock();
}
}
private TPipeConsensusTransferResp handleTransferFileSeal(final
PipeConsensusTsFileSealReq req) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("PipeConsensus-PipeName-{}: starting to receive tsFile
seal", consensusPipeName);
- }
+ // TODO: turn it to debug after GA
+ LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile seal",
consensusPipeName);
long startBorrowTsFileWriterNanos = System.nanoTime();
PipeConsensusTsFileWriter tsFileWriter =
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
@@ -531,11 +543,9 @@ public class PipeConsensusReceiver {
private TPipeConsensusTransferResp handleTransferFileSealWithMods(
final PipeConsensusTsFileSealWithModReq req) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "PipeConsensus-PipeName-{}: starting to receive tsFile seal with
mods",
- consensusPipeName);
- }
+ // TODO: turn it to debug after GA
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: starting to receive tsFile seal with
mods", consensusPipeName);
long startBorrowTsFileWriterNanos = System.nanoTime();
PipeConsensusTsFileWriter tsFileWriter =
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
@@ -896,18 +906,28 @@ public class PipeConsensusReceiver {
}
}
- private void deleteFile(File file) {
+ private void deleteFileOrDirectoryIfExists(File file, String reason) {
if (file.exists()) {
try {
- RetryUtils.retryOnException(() -> FileUtils.delete(file));
+ if (file.isDirectory()) {
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(file);
+ return null;
+ });
+ } else {
+ RetryUtils.retryOnException(() -> FileUtils.delete(file));
+ }
LOGGER.info(
- "PipeConsensus-PipeName-{}: Original writing file {} was deleted.",
+ "PipeConsensus-PipeName-{}: {} {} was deleted.",
consensusPipeName,
+ reason,
file.getPath());
} catch (IOException e) {
LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to delete original writing file
{}, because {}.",
+ "PipeConsensus-PipeName-{}: {} Failed to delete {}, because {}.",
consensusPipeName,
+ reason,
file.getPath(),
e.getMessage(),
e);
@@ -915,8 +935,9 @@ public class PipeConsensusReceiver {
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "PipeConsensus-PipeName-{}: Original file {} is not existed. No
need to delete.",
+ "PipeConsensus-PipeName-{}: {} {} is not existed. No need to
delete.",
consensusPipeName,
+ reason,
file.getPath());
}
}
@@ -924,7 +945,9 @@ public class PipeConsensusReceiver {
private void deleteCurrentWritingFile(PipeConsensusTsFileWriter
tsFileWriter) {
if (tsFileWriter.getWritingFile() != null) {
- deleteFile(tsFileWriter.getWritingFile());
+ deleteFileOrDirectoryIfExists(
+ tsFileWriter.getWritingFile(),
+ String.format("TsFileWriter-%s delete current writing file",
tsFileWriter.index));
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -1007,17 +1030,8 @@ public class PipeConsensusReceiver {
consensusPipeName, newReceiverDir.getPath()));
}
// Remove exists dir
- if (newReceiverDir.exists()) {
- RetryUtils.retryOnException(
- () -> {
- FileUtils.deleteDirectory(newReceiverDir);
- return null;
- });
- LOGGER.info(
- "PipeConsensus-PipeName-{}: Origin receiver file dir {} was
deleted.",
- consensusPipeName,
- newReceiverDir.getPath());
- }
+ deleteFileOrDirectoryIfExists(newReceiverDir, "Initial Receiver: delete
origin receive dir");
+
if (!newReceiverDir.mkdirs()) {
LOGGER.warn(
"PipeConsensus-PipeName-{}: Failed to create receiver file dir {}.
May because authority or dir already exists etc.",
@@ -1036,32 +1050,7 @@ public class PipeConsensusReceiver {
// Clear the original receiver file dir if exists
for (String receiverFileBaseDir : receiveDirs) {
File receiverDir = new File(receiverFileBaseDir);
- if (receiverDir.exists()) {
- try {
- RetryUtils.retryOnException(
- () -> {
- FileUtils.deleteDirectory(receiverDir);
- return null;
- });
- LOGGER.info(
- "PipeConsensus-PipeName-{}: Original receiver file dir {} was
deleted successfully..",
- consensusPipeName,
- receiverDir.getPath());
- } catch (IOException e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: Failed to delete original receiver
file dir {}",
- consensusPipeName,
- receiverDir.getPath(),
- e);
- }
- } else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "PipeConsensus-PipeName-{}: Receiver exit: Original receiver
file dir {} does not exist. No need to delete.",
- consensusPipeName,
- receiverDir.getPath());
- }
- }
+ deleteFileOrDirectoryIfExists(receiverDir, "Clear receive dir manually");
}
}
@@ -1072,9 +1061,6 @@ public class PipeConsensusReceiver {
public synchronized void handleExit() {
// only after closing request executor, can we clean receiver.
requestExecutor.tryClose();
- // Clear the tsFileWriters, receiverBuffer and receiver base dirs
- requestExecutor.clear(false);
- clearAllReceiverBaseDir();
// remove metric
MetricService.getInstance().removeMetricSet(pipeConsensusReceiverMetrics);
// cancel periodic task
@@ -1082,8 +1068,9 @@ public class PipeConsensusReceiver {
tsFileWriterCheckerFuture.cancel(false);
tsFileWriterCheckerFuture = null;
}
- LOGGER.info(
- "PipeConsensus-PipeName-{}: Receiver exit: Receiver exited.",
consensusPipeName.toString());
+ // Clear the tsFileWriters, receiverBuffer and receiver base dirs
+ requestExecutor.clear(false, true);
+ LOGGER.info("Receiver-{} exit successfully.",
consensusPipeName.toString());
}
private class PipeConsensusTsFileWriterPool {
@@ -1120,7 +1107,9 @@ public class PipeConsensusReceiver {
Optional<PipeConsensusTsFileWriter> tsFileWriter =
pipeConsensusTsFileWriterPool.stream()
.filter(
- item -> Objects.equals(commitId,
item.getCommitIdOfCorrespondingHolderEvent()))
+ item ->
+ item.isUsed()
+ && Objects.equals(commitId,
item.getCommitIdOfCorrespondingHolderEvent()))
.findFirst();
// If the TsFileInsertionEvent is first using tsFileWriter, we will find
the first available
@@ -1178,7 +1167,7 @@ public class PipeConsensusReceiver {
});
}
- public void handleExit(ConsensusPipeName consensusPipeName) {
+ public void releaseAllWriters(ConsensusPipeName consensusPipeName) {
pipeConsensusTsFileWriterPool.forEach(
tsFileWriter -> {
// Wait until tsFileWriter is not used by TsFileInsertionEvent or
timeout.
@@ -1248,25 +1237,11 @@ public class PipeConsensusReceiver {
}
String localWritingDirPath = receiverBasePath + File.separator + index;
- LOGGER.info(
- "PipeConsensus-PipeName-{}: tsfileWriter-{} roll to writing path {}",
- consensusPipeName,
- index,
- localWritingDirPath);
this.localWritingDir = new File(localWritingDirPath);
// Remove exists dir
- if (this.localWritingDir.exists()) {
- RetryUtils.retryOnException(
- () -> {
- FileUtils.deleteDirectory(this.localWritingDir);
- return null;
- });
- LOGGER.info(
- "PipeConsensus-PipeName-{}: Origin receiver tsFileWriter-{} file
dir {} was deleted.",
- consensusPipeName,
- index,
- this.localWritingDir.getPath());
- }
+ deleteFileOrDirectoryIfExists(
+ this.localWritingDir,
+ String.format("TsFileWriter-%s roll to new dir and delete last
writing dir", index));
if (!this.localWritingDir.mkdirs()) {
LOGGER.warn(
"PipeConsensus-PipeName-{}: Failed to create receiver
tsFileWriter-{} file dir {}. May because authority or dir already exists etc.",
@@ -1278,6 +1253,11 @@ public class PipeConsensusReceiver {
"PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d
receiver file dir %s. May because authority or dir already exists etc.",
consensusPipeName, index, this.localWritingDir.getPath()));
}
+ LOGGER.info(
+ "PipeConsensus-PipeName-{}: tsfileWriter-{} roll to writing path {}",
+ consensusPipeName,
+ index,
+ localWritingDirPath);
}
public File getLocalWritingDir() {
@@ -1290,6 +1270,13 @@ public class PipeConsensusReceiver {
public void setWritingFile(File writingFile) {
this.writingFile = writingFile;
+ // TODO: remove it into debug after GA
+ if (writingFile == null) {
+ LOGGER.info(
+ "PipeConsensus-{}: TsFileWriter-{} set null writing file",
+ consensusPipeName.toString(),
+ index);
+ }
}
public RandomAccessFile getWritingFileWriter() {
@@ -1298,6 +1285,13 @@ public class PipeConsensusReceiver {
public void setWritingFileWriter(RandomAccessFile writingFileWriter) {
this.writingFileWriter = writingFileWriter;
+ // TODO: remove it into debug after GA
+ if (writingFileWriter == null) {
+ LOGGER.info(
+ "PipeConsensus-{}: TsFileWriter-{} set null writing file writer",
+ consensusPipeName.toString(),
+ index);
+ }
}
public TCommitId getCommitIdOfCorrespondingHolderEvent() {
@@ -1367,19 +1361,8 @@ public class PipeConsensusReceiver {
// close file
if (writingFile != null) {
- try {
- RetryUtils.retryOnException(() -> FileUtils.delete(writingFile));
- LOGGER.info(
- "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file {}
was deleted.",
- consensusPipeName,
- writingFile.getPath());
- } catch (Exception e) {
- LOGGER.warn(
- "PipeConsensus-PipeName-{}: TsFileWriter exit: Delete writing
file {} error.",
- consensusPipeName,
- writingFile.getPath(),
- e);
- }
+ deleteFileOrDirectoryIfExists(
+ writingFile, String.format("TsFileWriter-%s exit: delete writing
file", this.index));
setWritingFile(null);
} else {
if (LOGGER.isDebugEnabled()) {
@@ -1432,12 +1415,10 @@ public class PipeConsensusReceiver {
long startAcquireLockNanos = System.nanoTime();
lock.lock();
try {
- // once thread gets lock, it will judge whether receiver is closed
if (isClosed.get()) {
return PipeConsensusReceiverAgent.closedResp(
consensusPipeName.toString(), req.getCommitId());
}
-
long startDispatchNanos = System.nanoTime();
metric.recordAcquireExecutorLockTimer(startDispatchNanos -
startAcquireLockNanos);
@@ -1552,12 +1533,10 @@ public class PipeConsensusReceiver {
!condition.await(
PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS,
TimeUnit.MILLISECONDS);
- // once thread gets lock, it will judge whether receiver is
closed
if (isClosed.get()) {
return PipeConsensusReceiverAgent.closedResp(
consensusPipeName.toString(), req.getCommitId());
}
-
// If some reqs find the buffer no longer contains their
requestMeta after jumping out
// from condition.await, it may indicate that during their wait,
some reqs with newer
// pipeTaskStartTimes or rebootTimes came in and refreshed the
requestBuffer. In that
@@ -1622,7 +1601,7 @@ public class PipeConsensusReceiver {
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after
dataNode reboots, it's
// safe to clear all events in buffer.
- clear(true);
+ clear(true, false);
// sync the follower's connectorRebootTimes with connector's actual
rebootTimes.
this.connectorRebootTimes = connectorRebootTimes;
this.pipeTaskRestartTimes = 0;
@@ -1634,7 +1613,7 @@ public class PipeConsensusReceiver {
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after
restarts, it's safe to
// clear all events in buffer.
- clear(false);
+ clear(false, false);
this.pipeTaskRestartTimes = pipeTaskRestartTimes;
}
@@ -1655,11 +1634,21 @@ public class PipeConsensusReceiver {
}
}
- private void clear(boolean resetSyncIndex) {
- this.reqExecutionOrderBuffer.clear();
- this.tsFileWriterPool.handleExit(consensusPipeName);
- if (resetSyncIndex) {
- this.onSyncedReplicateIndex = 0;
+ private void clear(boolean resetSyncIndex, boolean cleanBaseDir) {
+ // TsFilePiece Writing may out of RequestExecutor.lock, meaning that we
must use additional
+ // lock here to ensure serial execution of cleanup and write piece
+ tsFilePieceReadWriteLock.writeLock().lock();
+ try {
+ this.reqExecutionOrderBuffer.clear();
+ this.tsFileWriterPool.releaseAllWriters(consensusPipeName);
+ if (resetSyncIndex) {
+ this.onSyncedReplicateIndex = 0;
+ }
+ if (cleanBaseDir) {
+ clearAllReceiverBaseDir();
+ }
+ } finally {
+ tsFilePieceReadWriteLock.writeLock().unlock();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
index 6d3bef50a00..92af030e623 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiverAgent.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.PipeConsensusRequestVersion;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
@@ -69,7 +71,7 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
ConsensusGroupId, Map<ConsensusPipeName,
AtomicReference<PipeConsensusReceiver>>>
replicaReceiverMap = new ConcurrentHashMap<>();
- private final Set<ConsensusPipeName> createdConsensusPipes = new
CopyOnWriteArraySet<>();
+ private final Set<ConsensusPipeName> aliveReceivers = new
CopyOnWriteArraySet<>();
private PipeConsensus pipeConsensus;
@@ -97,7 +99,7 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
TSStatusCode.PIPE_CONSENSUS_CLOSE_ERROR,
"PipeConsensus receiver received a request after it was
closed."));
LOGGER.info(
- "PipeConsensus-{}: receive on-the-fly no.{} event after consensus pipe
was dropped, discard it",
+ "PipeConsensus-{}: receive on-the-fly no.{} event after data region
was deleted, discard it",
consensusInfo,
tCommitId);
return new TPipeConsensusTransferResp(status);
@@ -135,11 +137,6 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
// 2. Route to given consensusPipeTask's receiver
ConsensusPipeName consensusPipeName =
new ConsensusPipeName(consensusGroupId, leaderDataNodeId, thisNodeId);
- // 3. Judge whether pipe task was dropped
- if (!createdConsensusPipes.contains(consensusPipeName)) {
- return null;
- }
-
AtomicBoolean isFirstGetReceiver = new AtomicBoolean(false);
AtomicReference<PipeConsensusReceiver> receiverReference =
consensusPipe2ReceiverMap.computeIfAbsent(
@@ -149,6 +146,11 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
return new AtomicReference<>(null);
});
+ // 3. If not first get receiver && receiver is not alive, return null.
+ if (!isFirstGetReceiver.get() &&
!aliveReceivers.contains(consensusPipeName)) {
+ return null;
+ }
+
if (receiverReference.get() == null) {
return internalSetAndGetReceiver(
consensusGroupId, consensusPipeName, reqVersion, isFirstGetReceiver);
@@ -182,10 +184,13 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
if (isFirstGetReceiver.get()) {
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
+ // If is first get receiver, set this receiver as alive.
+ aliveReceivers.add(consensusPipeName);
receiverReference.set(
RECEIVER_CONSTRUCTORS
.get(reqVersion)
.apply(pipeConsensus, consensusGroupId, consensusPipeName));
+ LOGGER.info("Receiver-{} is ready", consensusPipeName);
} else {
throw new UnsupportedOperationException(
String.format("Unsupported pipeConsensus request version %d",
reqVersion));
@@ -210,25 +215,33 @@ public class PipeConsensusReceiverAgent implements
ConsensusPipeReceiver {
}
}
- /** Release receiver of given pipeConsensusTask */
+ /** Release all receivers of given data region */
@Override
- public final void handleDropPipeConsensusTask(ConsensusPipeName pipeName) {
+ public final void releaseReceiverResource(DataRegionId dataRegionId) {
// 1. Route to given consensusGroup's receiver map
Map<ConsensusPipeName, AtomicReference<PipeConsensusReceiver>>
consensusPipe2ReciverMap =
- replicaReceiverMap.getOrDefault(pipeName.getConsensusGroupId(), new
ConcurrentHashMap<>());
- // 2. Route to given consensusPipeTask's receiver
- AtomicReference<PipeConsensusReceiver> receiverReference =
- consensusPipe2ReciverMap.getOrDefault(pipeName, null);
- // 3. Release receiver
- if (receiverReference != null) {
- createdConsensusPipes.remove(pipeName);
- receiverReference.get().handleExit();
- receiverReference.set(null);
- consensusPipe2ReciverMap.remove(pipeName);
- }
- }
-
- public void markConsensusPipeAsCreated(ConsensusPipeName pipeName) {
- createdConsensusPipes.add(pipeName);
+ this.replicaReceiverMap.getOrDefault(
+ ConsensusGroupId.Factory.create(
+ TConsensusGroupType.DataRegion.getValue(),
dataRegionId.getId()),
+ new ConcurrentHashMap<>());
+ // 2. Release all related receivers
+ consensusPipe2ReciverMap.entrySet().stream()
+ .filter(entry -> entry.getKey().getReceiverDataNodeId() == thisNodeId)
+ .forEach(
+ receiverEntry -> {
+ ConsensusPipeName consensusPipeName = receiverEntry.getKey();
+ AtomicReference<PipeConsensusReceiver> receiverReference =
receiverEntry.getValue();
+ if (receiverReference != null) {
+ // no longer receive new request
+ aliveReceivers.remove(consensusPipeName);
+ receiverReference.get().handleExit();
+ receiverReference.set(null);
+ }
+ });
+ // 3. Release replica map
+ this.replicaReceiverMap.remove(dataRegionId);
+ // 4. GC receiver map
+ consensusPipe2ReciverMap.clear();
+ LOGGER.info("All Receivers related to {} are released.", dataRegionId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index d684367787d..cf24235f9ed 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -781,6 +781,7 @@ public class StorageEngine implements IService {
region.syncDeleteDataFiles();
region.deleteFolder(systemDir);
region.deleteDALFolderAndClose();
+
PipeDataNodeAgent.receiver().pipeConsensus().releaseReceiverResource(regionId);
switch (CONFIG.getDataRegionConsensusProtocolClass()) {
case ConsensusFactory.IOT_CONSENSUS:
case ConsensusFactory.IOT_CONSENSUS_V2: