This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0492b6a9a83 Pipe: Fix receiver can not delete dir on thread exit if
files in dir are not loaded successfully (#12302)
0492b6a9a83 is described below
commit 0492b6a9a83f3de3aae0cd7370f5b3c6f469f8b4
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Apr 8 17:29:50 2024 +0800
Pipe: Fix receiver can not delete dir on thread exit if files in dir are
not loaded successfully (#12302)
---
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 31 ++-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 20 +-
.../resource/tsfile/PipeTsFileResourceManager.java | 1 -
.../commons/pipe/receiver/IoTDBFileReceiver.java | 263 ++++++++++++++-------
4 files changed, 218 insertions(+), 97 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 28e08e231a8..6f2fb7966db 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -88,13 +88,13 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
private static final AtomicInteger QUERY_ID_GENERATOR = new AtomicInteger(0);
- private final ConfigManager configManager =
ConfigNode.getInstance().getConfigManager();
-
- private static final PipeConfigPhysicalPlanTSStatusVisitor statusVisitor =
+ private static final PipeConfigPhysicalPlanTSStatusVisitor STATUS_VISITOR =
new PipeConfigPhysicalPlanTSStatusVisitor();
- private static final PipeConfigPhysicalPlanExceptionVisitor exceptionVisitor
=
+ private static final PipeConfigPhysicalPlanExceptionVisitor
EXCEPTION_VISITOR =
new PipeConfigPhysicalPlanExceptionVisitor();
+ private final ConfigManager configManager =
ConfigNode.getInstance().getConfigManager();
+
@Override
public TPipeTransferResp receive(final TPipeTransferReq req) {
try {
@@ -135,13 +135,16 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_TYPE_ERROR,
String.format("Unsupported PipeRequestType on ConfigNode %s.",
rawRequestType));
- LOGGER.warn("Unsupported PipeRequestType on ConfigNode, response status
= {}.", status);
+ LOGGER.warn(
+ "Receiver id = {}: Unsupported PipeRequestType on ConfigNode,
response status = {}.",
+ receiverId.get(),
+ status);
return new TPipeTransferResp(status);
} catch (Exception e) {
final String error =
"Exception encountered while handling pipe transfer request. Root
cause: "
+ e.getMessage();
- LOGGER.warn(error, e);
+ LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR,
error));
}
}
@@ -167,12 +170,20 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
try {
result = executePlan(plan);
if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.warn("Failure status encountered while executing plan {}: {}",
plan, result);
- result = statusVisitor.process(plan, result);
+ LOGGER.warn(
+ "Receiver id = {}: Failure status encountered while executing plan
{}: {}",
+ receiverId.get(),
+ plan,
+ result);
+ result = STATUS_VISITOR.process(plan, result);
}
} catch (Exception e) {
- LOGGER.warn("Exception encountered while executing plan {}: ", plan, e);
- result = exceptionVisitor.process(plan, e);
+ LOGGER.warn(
+ "Receiver id = {}: Exception encountered while executing plan {}: ",
+ receiverId.get(),
+ plan,
+ e);
+ result = EXCEPTION_VISITOR.process(plan, e);
}
return result;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 783069c566a..94f288a0f6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -182,11 +182,14 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_TYPE_ERROR,
String.format("Unknown PipeRequestType %s.", rawRequestType));
- LOGGER.warn("Unknown PipeRequestType, response status = {}.", status);
+ LOGGER.warn(
+ "Receiver id = {}: Unknown PipeRequestType, response status = {}.",
+ receiverId.get(),
+ status);
return new TPipeTransferResp(status);
} catch (IOException e) {
- String error = String.format("Serialization error during pipe receiving,
%s", e);
- LOGGER.warn(error);
+ final String error = String.format("Serialization error during pipe
receiving, %s", e);
+ LOGGER.warn("Receiver id = {}: {}", receiverId.get(), error, e);
return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR,
error));
}
}
@@ -312,11 +315,18 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return result;
} else {
LOGGER.warn(
- "Failure status encountered while executing statement {}: {}",
statement, result);
+ "Receiver id = {}: Failure status encountered while executing
statement {}: {}",
+ receiverId.get(),
+ statement,
+ result);
return statement.accept(statusVisitor, result);
}
} catch (Exception e) {
- LOGGER.warn("Exception encountered while executing statement {}: ",
statement, e);
+ LOGGER.warn(
+ "Receiver id = {}: Exception encountered while executing statement
{}: ",
+ receiverId.get(),
+ statement,
+ e);
return statement.accept(exceptionVisitor, e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 33ef8b02636..5dd63847866 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -214,7 +214,6 @@ public class PipeTsFileResourceManager {
* delete the file. if the given file is not a hardlink or copied file, do
nothing.
*
* @param hardlinkOrCopiedFile the copied or hardlinked file
- * @throws IOException when delete file failed
*/
public void decreaseFileReference(File hardlinkOrCopiedFile) {
lock.lock();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index fe0a2644f04..79fed02b1e5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -37,13 +37,13 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.nio.file.Files;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
@@ -61,7 +61,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
// Used to generate transfer id, which is used to identify a receiver thread.
private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0);
- private final AtomicLong receiverId = new AtomicLong(0);
+ protected final AtomicLong receiverId = new AtomicLong(0);
private File writingFile;
private RandomAccessFile writingFileWriter;
@@ -94,53 +94,69 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
try {
- Files.delete(receiverFileDirWithIdSuffix.get().toPath());
+ FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
LOGGER.info(
- "Original receiver file dir {} was deleted.",
+ "Receiver id = {}: Original receiver file dir {} was deleted.",
+ receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
} catch (IOException e) {
LOGGER.warn(
- "Failed to delete original receiver file dir {}, because {}.",
+ "Receiver id = {}: Failed to delete original receiver file dir
{}, because {}.",
+ receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath(),
- e.getMessage());
+ e.getMessage(),
+ e);
}
} else {
- LOGGER.info(
- "Original receiver file dir {} is not existed. No need to delete.",
- receiverFileDirWithIdSuffix.get().getPath());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Original receiver file dir {} is not existed.
No need to delete.",
+ receiverId.get(),
+ receiverFileDirWithIdSuffix.get().getPath());
+ }
}
receiverFileDirWithIdSuffix.set(null);
} else {
- LOGGER.info("Current receiver file dir is null. No need to delete.");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Current receiver file dir is null. No need to
delete.",
+ receiverId.get());
+ }
}
final String receiverFileBaseDir;
try {
receiverFileBaseDir = getReceiverFileBaseDir();
if (Objects.isNull(receiverFileBaseDir)) {
- LOGGER.error(
- "Failed to init pipe receiver file folder manager because all
disks of folders are full.");
+ LOGGER.warn(
+ "Receiver id = {}: Failed to init pipe receiver file folder
manager because all disks of folders are full.",
+ receiverId.get());
return new
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
}
} catch (Exception e) {
- LOGGER.error(
- "Fail to create pipe receiver file folder because all disks of
folders are full.", e);
+ LOGGER.warn(
+ "Receiver id = {}: Failed to create pipe receiver file folder
because all disks of folders are full.",
+ receiverId.get(),
+ e);
return new
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
}
// Create a new receiver file dir
final File newReceiverDir = new File(receiverFileBaseDir,
Long.toString(receiverId.get()));
- if (!newReceiverDir.exists()) {
- if (newReceiverDir.mkdirs()) {
- LOGGER.info("Receiver file dir {} was created.",
newReceiverDir.getPath());
- } else {
- LOGGER.error("Failed to create receiver file dir {}.",
newReceiverDir.getPath());
- }
+ if (!newReceiverDir.exists() && !newReceiverDir.mkdirs()) {
+ LOGGER.warn(
+ "Receiver id = {}: Failed to create receiver file dir {}.",
+ receiverId.get(),
+ newReceiverDir.getPath());
+ return new TPipeTransferResp(
+ RpcUtils.getStatus(
+ TSStatusCode.PIPE_HANDSHAKE_ERROR,
+ String.format("Failed to create receiver file dir %s.",
newReceiverDir.getPath())));
}
receiverFileDirWithIdSuffix.set(newReceiverDir);
LOGGER.info(
- "Handshake successfully, receiver id = {}, receiver file dir = {}.",
+ "Receiver id = {}: Handshake successfully, receiver file dir = {}.",
receiverId.get(),
newReceiverDir.getPath());
return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
@@ -157,7 +173,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
"Receiver can not get clusterId from config node.");
- LOGGER.warn("Handshake failed, response status = {}.", status);
+ LOGGER.warn(
+ "Receiver id = {}: Handshake failed, response status = {}.",
receiverId.get(), status);
return new TPipeTransferResp(status);
}
@@ -168,7 +185,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not
contain clusterId.");
- LOGGER.warn("Handshake failed, response status = {}.", status);
+ LOGGER.warn(
+ "Receiver id = {}: Handshake failed, response status = {}.",
receiverId.get(), status);
return new TPipeTransferResp(status);
}
@@ -180,7 +198,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
String.format(
"Receiver and sender are from the same cluster %s.",
clusterIdFromHandshakeRequest));
- LOGGER.warn("Handshake failed, response status = {}.", status);
+ LOGGER.warn(
+ "Receiver id = {}: Handshake failed, response status = {}.",
receiverId.get(), status);
return new TPipeTransferResp(status);
}
@@ -192,7 +211,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
"Handshake request does not contain timestampPrecision.");
- LOGGER.warn("Handshake failed, response status = {}.", status);
+ LOGGER.warn(
+ "Receiver id = {}: Handshake failed, response status = {}.",
receiverId.get(), status);
return new TPipeTransferResp(status);
}
@@ -237,7 +257,10 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
String.format(
"Request sender to reset file reader's offset from %s to
%s.",
req.getStartWritingOffset(), writingFileWriter.length()));
- LOGGER.warn("File offset reset requested by receiver, response status
= {}.", status);
+ LOGGER.warn(
+ "Receiver id = {}: File offset reset requested by receiver,
response status = {}.",
+ receiverId.get(),
+ status);
return PipeTransferFilePieceResp.toTPipeTransferResp(status,
writingFileWriter.length());
}
@@ -246,7 +269,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
return PipeTransferFilePieceResp.toTPipeTransferResp(
RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
} catch (Exception e) {
- LOGGER.warn("Failed to write file piece from req {}.", req, e);
+ LOGGER.warn(
+ "Receiver id = {}: Failed to write file piece from req {}.",
receiverId.get(), req, e);
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -267,8 +291,9 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
LOGGER.info(
- "Writing file {} is not existed or name is not correct, try to create
it. "
+ "Receiver id = {}: Writing file {} is not existed or name is not
correct, try to create it. "
+ "Current writing file is {}.",
+ receiverId.get(),
fileName,
writingFile == null ? "null" : writingFile.getPath());
@@ -284,16 +309,23 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (!receiverFileDirWithIdSuffix.get().exists()) {
if (receiverFileDirWithIdSuffix.get().mkdirs()) {
LOGGER.info(
- "Receiver file dir {} was created.",
receiverFileDirWithIdSuffix.get().getPath());
+ "Receiver id = {}: Receiver file dir {} was created.",
+ receiverId.get(),
+ receiverFileDirWithIdSuffix.get().getPath());
} else {
LOGGER.error(
- "Failed to create receiver file dir {}.",
receiverFileDirWithIdSuffix.get().getPath());
+ "Receiver id = {}: Failed to create receiver file dir {}.",
+ receiverId.get(),
+ receiverFileDirWithIdSuffix.get().getPath());
}
}
writingFile = new File(receiverFileDirWithIdSuffix.get(), fileName);
writingFileWriter = new RandomAccessFile(writingFile, "rw");
- LOGGER.info("Writing file {} was created. Ready to write file pieces.",
writingFile.getPath());
+ LOGGER.info(
+ "Receiver id = {}: Writing file {} was created. Ready to write file
pieces.",
+ receiverId.get(),
+ writingFile.getPath());
}
private boolean isFileExistedAndNameCorrect(String fileName) {
@@ -305,17 +337,24 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
try {
writingFileWriter.close();
LOGGER.info(
- "Current writing file writer {} was closed.",
+ "Receiver id = {}: Current writing file writer {} was closed.",
+ receiverId.get(),
writingFile == null ? "null" : writingFile.getPath());
} catch (IOException e) {
LOGGER.warn(
- "Failed to close current writing file writer {}, because {}.",
+ "Receiver id = {}: Failed to close current writing file writer {},
because {}.",
+ receiverId.get(),
writingFile == null ? "null" : writingFile.getPath(),
- e.getMessage());
+ e.getMessage(),
+ e);
}
writingFileWriter = null;
} else {
- LOGGER.info("Current writing file writer is null. No need to close.");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Current writing file writer is null. No need to
close.",
+ receiverId.get());
+ }
}
}
@@ -323,23 +362,36 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (writingFile != null) {
deleteFile(writingFile);
} else {
- LOGGER.info("Current writing file is null. No need to delete.");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Current writing file is null. No need to
delete.", receiverId.get());
+ }
}
}
private void deleteFile(File file) {
if (file.exists()) {
try {
- Files.delete(file.toPath());
- LOGGER.info("Original writing file {} was deleted.", file.getPath());
+ FileUtils.delete(file);
+ LOGGER.info(
+ "Receiver id = {}: Original writing file {} was deleted.",
+ receiverId.get(),
+ file.getPath());
} catch (IOException e) {
LOGGER.warn(
- "Failed to delete original writing file {}, because {}.",
+ "Receiver id = {}: Failed to delete original writing file {},
because {}.",
+ receiverId.get(),
file.getPath(),
- e.getMessage());
+ e.getMessage(),
+ e);
}
} else {
- LOGGER.info("Original file {} is not existed. No need to delete.",
file.getPath());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Original file {} is not existed. No need to
delete.",
+ receiverId.get(),
+ file.getPath());
+ }
}
}
@@ -347,7 +399,8 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
final boolean offsetCorrect = writingFileWriter.length() == offset;
if (!offsetCorrect) {
LOGGER.warn(
- "Writing file {}'s offset is {}, but request sender's offset is {}.",
+ "Receiver id = {}: Writing file {}'s offset is {}, but request
sender's offset is {}.",
+ receiverId.get(),
writingFile.getPath(),
writingFileWriter.length(),
offset);
@@ -392,20 +445,21 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
final TSStatus status = loadFileV1(req, fileAbsolutePath);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
- "Seal file {} successfully. Receiver id is {}.", fileAbsolutePath,
receiverId.get());
+ "Receiver id = {}: Seal file {} successfully.", receiverId.get(),
fileAbsolutePath);
} else {
LOGGER.warn(
- "Failed to seal file {}, because {}. Receiver id is {}.",
+ "Receiver id = {}: Failed to seal file {}, because {}.",
+ receiverId.get(),
fileAbsolutePath,
- status.getMessage(),
- receiverId.get());
+ status.getMessage());
}
return new TPipeTransferResp(status);
} catch (IOException e) {
LOGGER.warn(
- String.format(
- "Failed to seal file %s from req %s. Receiver id is %d.",
- writingFile, req, receiverId.get()),
+ "Receiver id = {}: Failed to seal file {} from req {}.",
+ receiverId.get(),
+ writingFile,
+ req,
e);
return new TPipeTransferResp(
RpcUtils.getStatus(
@@ -413,7 +467,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
String.format("Failed to seal file %s because %s", writingFile,
e.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will
be deleted.
- // All pieces of the writing file and its mod(if exists) should be
retransmitted by the
+ // All pieces of the writing file and its mod (if exists) should be
retransmitted by the
// sender.
closeCurrentWritingFileWriter();
deleteCurrentWritingFile();
@@ -471,21 +525,18 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
final TSStatus status = loadFileV2(req, fileAbsolutePaths);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
- "Seal file {} successfully. Receiver id is {}.",
fileAbsolutePaths, receiverId.get());
+ "Receiver id = {}: Seal file {} successfully.", receiverId.get(),
fileAbsolutePaths);
} else {
LOGGER.warn(
- "Failed to seal file {}, status is {}. Receiver id is {}.",
+ "Receiver id = {}: Failed to seal file {}, status is {}.",
+ receiverId.get(),
fileAbsolutePaths,
- status,
- receiverId.get());
+ status);
}
return new TPipeTransferResp(status);
} catch (IOException | IllegalPathException e) {
LOGGER.warn(
- String.format(
- "Failed to seal file %s from req %s. Receiver id is %d.",
- writingFile, req, receiverId.get()),
- e);
+ "Receiver id = {}: Failed to seal file {} from req {}.",
receiverId.get(), files, req, e);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -508,7 +559,10 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format("Failed to seal file %s, the file does not
exist.", fileName));
- LOGGER.warn(status.getMessage());
+ LOGGER.warn(
+ "Receiver id = {}: Failed to seal file {}, because the file does not
exist.",
+ receiverId.get(),
+ fileName);
return new TPipeTransferResp(status);
}
@@ -520,9 +574,16 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Failed to seal file %s, because the length of file is not
correct. "
+ "The original file has length %s, but receiver file
has length %s.",
fileName, fileLength, writingFileWriter.length()));
- LOGGER.warn(status.getMessage());
+ LOGGER.warn(
+ "Receiver id = {}: Failed to seal file {}, because the length of
file is not correct. "
+ + "The original file has length {}, but receiver file has length
{}.",
+ receiverId.get(),
+ fileName,
+ fileLength,
+ writingFileWriter.length());
return new TPipeTransferResp(status);
}
+
return null;
}
@@ -533,8 +594,12 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
String.format(
- "Failed to seal file %s, but writing file is %s.", fileName,
writingFile));
- LOGGER.warn(status.getMessage());
+ "Failed to seal file %s, because writing file is %s.",
fileName, writingFile));
+ LOGGER.warn(
+ "Receiver id = {}: Failed to seal file {}, because writing file is
{}.",
+ receiverId.get(),
+ fileName,
+ writingFile);
return new TPipeTransferResp(status);
}
@@ -546,9 +611,16 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
"Failed to seal file %s, because the length of file is not
correct. "
+ "The original file has length %s, but receiver file
has length %s.",
fileName, fileLength, writingFileWriter.length()));
- LOGGER.warn(status.getMessage());
+ LOGGER.warn(
+ "Receiver id = {}: Failed to seal file {}, because the length of
file is not correct. "
+ + "The original file has length {}, but receiver file has length
{}.",
+ receiverId.get(),
+ fileName,
+ fileLength,
+ writingFileWriter.length());
return new TPipeTransferResp(status);
}
+
return null;
}
@@ -557,7 +629,9 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
writingFile != null && writingFile.exists() && writingFileWriter !=
null;
if (!isWritingFileAvailable) {
LOGGER.info(
- "Writing file {} is not available. Writing file is null: {}, writing
file exists: {}, writing file writer is null: {}.",
+ "Receiver id = {}: Writing file {} is not available. "
+ + "Writing file is null: {}, writing file exists: {}, writing
file writer is null: {}.",
+ receiverId.get(),
writingFile,
writingFile == null,
writingFile != null && writingFile.exists(),
@@ -578,52 +652,79 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (writingFileWriter != null) {
try {
writingFileWriter.close();
- LOGGER.info("IoTDBFileReceiverV1#handleExit: writing file writer was
closed.");
+ LOGGER.info(
+ "Receiver id = {}: Handling exit: Writing file writer was
closed.", receiverId.get());
} catch (Exception e) {
- LOGGER.warn("IoTDBFileReceiverV1#handleExit: close writing file writer
error.", e);
+ LOGGER.warn(
+ "Receiver id = {}: Handling exit: Close writing file writer
error.",
+ receiverId.get(),
+ e);
}
writingFileWriter = null;
} else {
- LOGGER.info("IoTDBFileReceiverV1#handleExit: writing file writer is
null. No need to close.");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Handling exit: Writing file writer is null. No
need to close.",
+ receiverId.get());
+ }
}
if (writingFile != null) {
try {
- Files.delete(writingFile.toPath());
+ FileUtils.delete(writingFile);
LOGGER.info(
- "IoTDBFileReceiverV1#handleExit: writing file {} was deleted.",
writingFile.getPath());
+ "Receiver id = {}: Handling exit: Writing file {} was deleted.",
+ receiverId.get(),
+ writingFile.getPath());
} catch (Exception e) {
- LOGGER.warn("IoTDBFileReceiverV1#handleExit: delete file {} error.",
writingFile.getPath());
+ LOGGER.warn(
+ "Receiver id = {}: Handling exit: Delete writing file {} error.",
+ receiverId.get(),
+ writingFile.getPath(),
+ e);
}
writingFile = null;
} else {
- LOGGER.info("IoTDBFileReceiverV1#handleExit: writing file is null. No
need to delete.");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Handling exit: Writing file is null. No need to
delete.",
+ receiverId.get());
+ }
}
// Clear the original receiver file dir if exists
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
try {
- Files.delete(receiverFileDirWithIdSuffix.get().toPath());
+ FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
LOGGER.info(
- "IoTDBFileReceiverV1#handleExit: original receiver file dir {}
was deleted.",
+ "Receiver id = {}: Handling exit: Original receiver file dir {}
was deleted.",
+ receiverId.get(),
receiverFileDirWithIdSuffix.get().getPath());
} catch (IOException e) {
LOGGER.warn(
- "IoTDBFileReceiverV1#handleExit: delete original receiver file
dir {} error.",
- receiverFileDirWithIdSuffix.get().getPath());
+ "Receiver id = {}: Handling exit: Delete original receiver file
dir {} error.",
+ receiverId.get(),
+ receiverFileDirWithIdSuffix.get().getPath(),
+ e);
}
} else {
- LOGGER.info(
- "IoTDBFileReceiverV1#handleExit: original receiver file dir {}
does not exist. No need to delete.",
- receiverFileDirWithIdSuffix.get().getPath());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Handling exit: Original receiver file dir {}
does not exist. No need to delete.",
+ receiverId.get(),
+ receiverFileDirWithIdSuffix.get().getPath());
+ }
}
receiverFileDirWithIdSuffix.set(null);
} else {
- LOGGER.info(
- "IoTDBFileReceiverV1#handleExit: original receiver file dir is null.
No need to delete.");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver id = {}: Handling exit: Original receiver file dir is
null. No need to delete.",
+ receiverId.get());
+ }
}
- LOGGER.info("IoTDBFileReceiverV1#handleExit: receiver exited.");
+ LOGGER.info("Receiver id = {}: Handling exit: Receiver exited.",
receiverId.get());
}
}