This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f33ee814b8 [IOTDB-2890] Dynamic port support for DataBlockManager
(#5491)
f33ee814b8 is described below
commit f33ee814b820a8a237ff2244a00fb48cdc378414
Author: Zhong Wang <[email protected]>
AuthorDate: Tue Apr 12 20:53:13 2022 +0800
[IOTDB-2890] Dynamic port support for DataBlockManager (#5491)
---
.../main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java | 7 ++++---
.../java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java | 5 +++++
.../java/org/apache/iotdb/db/mpp/execution/QueryExecution.java | 1 +
.../org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java | 2 ++
4 files changed, 12 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 00e955e9a4..5f06947a8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -246,6 +246,7 @@ public class DataBlockManager implements IDataBlockManager {
public ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
String remoteHostname,
+ int remotePort,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId)
throws IOException {
@@ -267,7 +268,7 @@ public class DataBlockManager implements IDataBlockManager {
localFragmentInstanceId,
localMemoryManager,
executorService,
- clientFactory.getDataBlockServiceClient(remoteHostname, 7777),
+ clientFactory.getDataBlockServiceClient(remoteHostname,
remotePort),
tsBlockSerdeFactory.get(),
new SinkHandleListenerImpl());
sinkHandles.put(localFragmentInstanceId, sinkHandle);
@@ -279,6 +280,7 @@ public class DataBlockManager implements IDataBlockManager {
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
String remoteHostname,
+ int remotePort,
TFragmentInstanceId remoteFragmentInstanceId)
throws IOException {
if (sourceHandles.containsKey(localFragmentInstanceId)
@@ -305,8 +307,7 @@ public class DataBlockManager implements IDataBlockManager {
localPlanNodeId,
localMemoryManager,
executorService,
- // TODO: hard coded port.
- clientFactory.getDataBlockServiceClient(remoteHostname, 7777),
+ clientFactory.getDataBlockServiceClient(remoteHostname,
remotePort),
tsBlockSerdeFactory.get(),
new SourceHandleListenerImpl());
sourceHandles
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index c8d6ba714e..e69a4a6b9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -34,12 +34,14 @@ public interface IDataBlockManager {
* blocks to the sink handle.
* @param remoteHostname Hostname of the remote fragment instance where the
data blocks should be
* sent to.
+ * @param remotePort Port of the remote fragment instance where the data
blocks should be sent to.
* @param remoteFragmentInstanceId ID of the remote fragment instance.
* @param remotePlanNodeId The sink plan node ID of the remote fragment
instance.
*/
ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
String remoteHostname,
+ int remotePort,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId)
throws TTransportException, IOException;
@@ -53,12 +55,15 @@ public interface IDataBlockManager {
* @param localPlanNodeId The local sink plan node ID.
* @param remoteHostname Hostname of the remote fragment instance where the
data blocks should be
* received from.
+ * @param remotePort Port of the remote fragment instance where the data
blocks should be received
+ * from.
* @param remoteFragmentInstanceId ID of the remote fragment instance.
*/
ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
String remoteHostname,
+ int remotePort,
TFragmentInstanceId remoteFragmentInstanceId)
throws IOException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 9b89162df0..a5dbeaf21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -230,6 +230,7 @@ public class QueryExecution implements IQueryExecution {
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
context.getResultNodeContext().getVirtualResultNodeId().getId(),
context.getResultNodeContext().getUpStreamEndpoint().getIp(),
+
context.getResultNodeContext().getUpStreamEndpoint().getPort(),
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index f38fb4b332..89112f2deb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -245,6 +245,7 @@ public class LocalExecutionPlanner {
localInstanceId.toThrift(),
node.getPlanNodeId().getId(),
source.getIp(),
+ source.getPort(),
remoteInstanceId.toThrift());
return new ExchangeOperator(operatorContext, sourceHandle,
node.getUpstreamPlanNodeId());
} catch (IOException e) {
@@ -263,6 +264,7 @@ public class LocalExecutionPlanner {
DATA_BLOCK_MANAGER.createSinkHandle(
localInstanceId.toThrift(),
target.getIp(),
+ target.getPort(),
targetInstanceId.toThrift(),
node.getDownStreamPlanNodeId().getId());
context.setSinkHandle(sinkHandle);