This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f33ee814b8 [IOTDB-2890] Dynamic port support for DataBlockManager 
(#5491)
f33ee814b8 is described below

commit f33ee814b820a8a237ff2244a00fb48cdc378414
Author: Zhong Wang <[email protected]>
AuthorDate: Tue Apr 12 20:53:13 2022 +0800

    [IOTDB-2890] Dynamic port support for DataBlockManager (#5491)
---
 .../main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java | 7 ++++---
 .../java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java     | 5 +++++
 .../java/org/apache/iotdb/db/mpp/execution/QueryExecution.java     | 1 +
 .../org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java | 2 ++
 4 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index 00e955e9a4..5f06947a8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -246,6 +246,7 @@ public class DataBlockManager implements IDataBlockManager {
   public ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String remoteHostname,
+      int remotePort,
       TFragmentInstanceId remoteFragmentInstanceId,
       String remotePlanNodeId)
       throws IOException {
@@ -267,7 +268,7 @@ public class DataBlockManager implements IDataBlockManager {
             localFragmentInstanceId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteHostname, 7777),
+            clientFactory.getDataBlockServiceClient(remoteHostname, 
remotePort),
             tsBlockSerdeFactory.get(),
             new SinkHandleListenerImpl());
     sinkHandles.put(localFragmentInstanceId, sinkHandle);
@@ -279,6 +280,7 @@ public class DataBlockManager implements IDataBlockManager {
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       String remoteHostname,
+      int remotePort,
       TFragmentInstanceId remoteFragmentInstanceId)
       throws IOException {
     if (sourceHandles.containsKey(localFragmentInstanceId)
@@ -305,8 +307,7 @@ public class DataBlockManager implements IDataBlockManager {
             localPlanNodeId,
             localMemoryManager,
             executorService,
-            // TODO: hard coded port.
-            clientFactory.getDataBlockServiceClient(remoteHostname, 7777),
+            clientFactory.getDataBlockServiceClient(remoteHostname, 
remotePort),
             tsBlockSerdeFactory.get(),
             new SourceHandleListenerImpl());
     sourceHandles
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index c8d6ba714e..e69a4a6b9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -34,12 +34,14 @@ public interface IDataBlockManager {
    *     blocks to the sink handle.
    * @param remoteHostname Hostname of the remote fragment instance where the 
data blocks should be
    *     sent to.
+   * @param remotePort Port of the remote fragment instance where the data 
blocks should be sent to.
    * @param remoteFragmentInstanceId ID of the remote fragment instance.
    * @param remotePlanNodeId The sink plan node ID of the remote fragment 
instance.
    */
   ISinkHandle createSinkHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String remoteHostname,
+      int remotePort,
       TFragmentInstanceId remoteFragmentInstanceId,
       String remotePlanNodeId)
       throws TTransportException, IOException;
@@ -53,12 +55,15 @@ public interface IDataBlockManager {
    * @param localPlanNodeId The local sink plan node ID.
    * @param remoteHostname Hostname of the remote fragment instance where the 
data blocks should be
    *     received from.
+   * @param remotePort Port of the remote fragment instance where the data 
blocks should be received
+   *     from.
    * @param remoteFragmentInstanceId ID of the remote fragment instance.
    */
   ISourceHandle createSourceHandle(
       TFragmentInstanceId localFragmentInstanceId,
       String localPlanNodeId,
       String remoteHostname,
+      int remotePort,
       TFragmentInstanceId remoteFragmentInstanceId)
       throws IOException;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 9b89162df0..a5dbeaf21f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -230,6 +230,7 @@ public class QueryExecution implements IQueryExecution {
                   
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
                   
context.getResultNodeContext().getVirtualResultNodeId().getId(),
                   context.getResultNodeContext().getUpStreamEndpoint().getIp(),
+                  
context.getResultNodeContext().getUpStreamEndpoint().getPort(),
                   
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift());
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index f38fb4b332..89112f2deb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -245,6 +245,7 @@ public class LocalExecutionPlanner {
                 localInstanceId.toThrift(),
                 node.getPlanNodeId().getId(),
                 source.getIp(),
+                source.getPort(),
                 remoteInstanceId.toThrift());
         return new ExchangeOperator(operatorContext, sourceHandle, 
node.getUpstreamPlanNodeId());
       } catch (IOException e) {
@@ -263,6 +264,7 @@ public class LocalExecutionPlanner {
             DATA_BLOCK_MANAGER.createSinkHandle(
                 localInstanceId.toThrift(),
                 target.getIp(),
+                target.getPort(),
                 targetInstanceId.toThrift(),
                 node.getDownStreamPlanNodeId().getId());
         context.setSinkHandle(sinkHandle);

Reply via email to