This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a9c55474839 Pipe: add information about sender's IP and port in the
pipe receiver logs (#14343)
a9c55474839 is described below
commit a9c55474839b32ea1bfc5ed4118dd22400890e6c
Author: nanxiang xia <[email protected]>
AuthorDate: Fri Dec 6 16:48:59 2024 +0800
Pipe: add information about sender's IP and port in the pipe receiver logs
(#14343)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/receiver/protocol/IoTDBConfigNodeReceiver.java | 16 ++++++++++++++++
.../receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 12 ++++++++++++
.../apache/iotdb/db/protocol/session/IClientSession.java | 2 +-
.../iotdb/db/protocol/session/RestClientSession.java | 2 +-
.../iotdb/commons/pipe/receiver/IoTDBFileReceiver.java | 8 +++++++-
5 files changed, 37 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 4da078d2db0..5a67005110a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -69,6 +69,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.confignode.service.ConfigNode;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -91,6 +93,8 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConfigNodeReceiver.class);
+ private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
+
private static final AtomicInteger QUERY_ID_GENERATOR = new AtomicInteger(0);
private static final PipeConfigPhysicalPlanTSStatusVisitor STATUS_VISITOR =
@@ -329,6 +333,18 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
return
ConfigNodeDescriptor.getInstance().getConf().getPipeReceiverFileDir();
}
+ @Override
+ protected String getSenderHost() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ return session != null ? session.getClientAddress() : "unknown";
+ }
+
+ @Override
+ protected String getSenderPort() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ return session != null ? String.valueOf(session.getClientPort()) :
"unknown";
+ }
+
@Override
protected TSStatus loadFileV1(
final PipeTransferFileSealReqV1 req, final String fileAbsolutePath) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c6ec711eb10..39acd678592 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -484,6 +484,18 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return Objects.isNull(folderManager) ? null :
folderManager.getNextFolder();
}
+ @Override
+ protected String getSenderHost() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ return session != null ? session.getClientAddress() : "unknown";
+ }
+
+ @Override
+ protected String getSenderPort() {
+ final IClientSession session = SESSION_MANAGER.getCurrSession();
+ return session != null ? String.valueOf(session.getClientPort()) :
"unknown";
+ }
+
@Override
protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final
String fileAbsolutePath)
throws IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
index 94f47c0434a..20c1fc0fdce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
@@ -56,7 +56,7 @@ public abstract class IClientSession {
public abstract String getClientAddress();
- abstract int getClientPort();
+ public abstract int getClientPort();
abstract TSConnectionType getConnectionType();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java
index 30ca7509d60..fa830ace3fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java
@@ -38,7 +38,7 @@ public class RestClientSession extends IClientSession {
}
@Override
- int getClientPort() {
+ public int getClientPort() {
return 0;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 477ef13dfa8..1ab1be21454 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -174,14 +174,20 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverFileDirWithIdSuffix.set(newReceiverDir);
LOGGER.info(
- "Receiver id = {}: Handshake successfully, receiver file dir = {}.",
+ "Receiver id = {}: Handshake successfully! Sender's host = {}, port =
{}. Receiver's file dir = {}.",
receiverId.get(),
+ getSenderHost(),
+ getSenderPort(),
newReceiverDir.getPath());
return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
}
protected abstract String getReceiverFileBaseDir() throws Exception;
+ protected abstract String getSenderHost();
+
+ protected abstract String getSenderPort();
+
protected TPipeTransferResp handleTransferHandshakeV2(final
PipeTransferHandshakeV2Req req)
throws IOException {
// Reject to handshake if the receiver can not take clusterId from config
node.