This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 51acf53822 Support LocalSinkHandle and LocalSourceHandle if two FI in 
the same DataNode (#6193)
51acf53822 is described below

commit 51acf53822f6e2586681851429ffbff6fed3e90d
Author: Jackie Tien <[email protected]>
AuthorDate: Mon Jun 13 15:19:59 2022 +0800

    Support LocalSinkHandle and LocalSourceHandle if two FI in the same 
DataNode (#6193)
---
 .../execution/datatransfer/SharedTsBlockQueue.java |  2 +-
 .../fragment/FragmentInstanceExecution.java        | 25 +++++++------
 .../fragment/FragmentInstanceStateMachine.java     |  7 +++-
 .../db/mpp/plan/constant/DataNodeEndPoints.java    |  4 +++
 .../db/mpp/plan/execution/QueryExecution.java      | 32 +++++++++++------
 .../db/mpp/plan/planner/LocalExecutionPlanner.java | 42 +++++++++++++++-------
 .../datatransfer/LocalSourceHandleTest.java        |  2 +-
 7 files changed, 77 insertions(+), 37 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
index e81f3dcbc5..3d991b1aa9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
@@ -100,7 +100,7 @@ public class SharedTsBlockQueue {
         .getQueryPool()
         .free(localFragmentInstanceId.getQueryId(), 
tsBlock.getRetainedSizeInBytes());
     bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
-    if (blocked.isDone() && queue.isEmpty()) {
+    if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
       blocked = SettableFuture.create();
     }
     return tsBlock;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 15da58fda1..bf4a119399 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.execution.driver.IDriver;
 import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
 
 import com.google.common.collect.ImmutableList;
+import io.airlift.concurrent.SetThreadName;
 import io.airlift.stats.CounterStat;
 
 import static java.util.Objects.requireNonNull;
@@ -102,18 +103,20 @@ public class FragmentInstanceExecution {
     requireNonNull(failedInstances, "failedInstances is null");
     stateMachine.addStateChangeListener(
         newState -> {
-          if (!newState.isDone()) {
-            return;
+          try (SetThreadName threadName = new 
SetThreadName(instanceId.getFullId())) {
+            if (!newState.isDone()) {
+              return;
+            }
+
+            // Update failed tasks counter
+            if (newState == FAILED) {
+              failedInstances.update(1);
+            }
+
+            driver.close();
+            sinkHandle.abort();
+            scheduler.abortFragmentInstance(instanceId);
           }
-
-          // Update failed tasks counter
-          if (newState == FAILED) {
-            failedInstances.update(1);
-          }
-
-          driver.close();
-          sinkHandle.abort();
-          scheduler.abortFragmentInstance(instanceId);
         });
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
index e908120dd6..699777d8f0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.concurrent.SetThreadName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +76,11 @@ public class FragmentInstanceStateMachine {
         new StateMachine<>(
             "FragmentInstance " + fragmentInstanceId, executor, RUNNING, 
TERMINAL_INSTANCE_STATES);
     instanceState.addStateChangeListener(
-        newState -> LOGGER.debug("Fragment Instance {} is {}", 
fragmentInstanceId, newState));
+        newState -> {
+          try (SetThreadName threadName = new 
SetThreadName(fragmentInstanceId.getFullId())) {
+            LOGGER.info("State transfer to {}", newState);
+          }
+        });
   }
 
   public long getCreatedTime() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/DataNodeEndPoints.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/DataNodeEndPoints.java
index 595dcdec90..15225eb3c9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/DataNodeEndPoints.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/DataNodeEndPoints.java
@@ -32,4 +32,8 @@ public class DataNodeEndPoints {
       new TEndPoint(
           IoTDBDescriptor.getInstance().getConfig().getInternalIp(),
           IoTDBDescriptor.getInstance().getConfig().getInternalPort());
+
+  public static boolean isSameNode(TEndPoint endPoint) {
+    return endPoint.equals(LOCAL_HOST_DATA_BLOCK_ENDPOINT);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 224cbacd1a..9453ce73a1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -74,6 +74,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static com.google.common.base.Throwables.throwIfUnchecked;
+import static 
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
 /**
  * QueryExecution stores all the status of a query which is being prepared or 
running inside the MPP
@@ -292,8 +293,9 @@ public class QueryExecution implements IQueryExecution {
       }
     } catch (ExecutionException | CancellationException e) {
       stateMachine.transitionToFailed(e);
-      throwIfUnchecked(e.getCause());
-      throw new RuntimeException(e.getCause());
+      Throwable t = e.getCause() == null ? e : e.getCause();
+      throwIfUnchecked(t);
+      throw new RuntimeException(t);
     } catch (InterruptedException e) {
       stateMachine.transitionToFailed(e);
       Thread.currentThread().interrupt();
@@ -352,15 +354,25 @@ public class QueryExecution implements IQueryExecution {
 
   private void initResultHandle() {
     if (this.resultHandle == null) {
+      TEndPoint upstreamEndPoint = 
context.getResultNodeContext().getUpStreamEndpoint();
+
       this.resultHandle =
-          DataBlockService.getInstance()
-              .getDataBlockManager()
-              .createSourceHandle(
-                  
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-                  
context.getResultNodeContext().getVirtualResultNodeId().getId(),
-                  context.getResultNodeContext().getUpStreamEndpoint(),
-                  
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
-                  stateMachine::transitionToFailed);
+          isSameNode(upstreamEndPoint)
+              ? DataBlockService.getInstance()
+                  .getDataBlockManager()
+                  .createLocalSourceHandle(
+                      
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                      
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+                      
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+                      stateMachine::transitionToFailed)
+              : DataBlockService.getInstance()
+                  .getDataBlockManager()
+                  .createSourceHandle(
+                      
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+                      
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+                      upstreamEndPoint,
+                      
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+                      stateMachine::transitionToFailed);
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index e5b01ff8a7..17406785cd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -178,6 +179,7 @@ import java.util.stream.Collectors;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
+import static 
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
 
 /**
  * Used to plan a fragment instance. Currently, we simply change it from 
PlanNode to executable
@@ -1050,17 +1052,24 @@ public class LocalExecutionPlanner {
           context.instanceContext.addOperatorContext(
               context.getNextOperatorId(),
               node.getPlanNodeId(),
-              SeriesScanOperator.class.getSimpleName());
+              ExchangeOperator.class.getSimpleName());
       FragmentInstanceId localInstanceId = context.instanceContext.getId();
       FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
 
+      TEndPoint upstreamEndPoint = node.getUpstreamEndpoint();
       ISourceHandle sourceHandle =
-          DATA_BLOCK_MANAGER.createSourceHandle(
-              localInstanceId.toThrift(),
-              node.getPlanNodeId().getId(),
-              node.getUpstreamEndpoint(),
-              remoteInstanceId.toThrift(),
-              context.instanceContext::failed);
+          isSameNode(upstreamEndPoint)
+              ? DATA_BLOCK_MANAGER.createLocalSourceHandle(
+                  localInstanceId.toThrift(),
+                  node.getPlanNodeId().getId(),
+                  remoteInstanceId.toThrift(),
+                  context.instanceContext::failed)
+              : DATA_BLOCK_MANAGER.createSourceHandle(
+                  localInstanceId.toThrift(),
+                  node.getPlanNodeId().getId(),
+                  upstreamEndPoint,
+                  remoteInstanceId.toThrift(),
+                  context.instanceContext::failed);
       return new ExchangeOperator(operatorContext, sourceHandle, 
node.getUpstreamPlanNodeId());
     }
 
@@ -1070,16 +1079,23 @@ public class LocalExecutionPlanner {
 
       FragmentInstanceId localInstanceId = context.instanceContext.getId();
       FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
+      TEndPoint downStreamEndPoint = node.getDownStreamEndpoint();
 
       checkArgument(DATA_BLOCK_MANAGER != null, "DATA_BLOCK_MANAGER should not 
be null");
 
       ISinkHandle sinkHandle =
-          DATA_BLOCK_MANAGER.createSinkHandle(
-              localInstanceId.toThrift(),
-              node.getDownStreamEndpoint(),
-              targetInstanceId.toThrift(),
-              node.getDownStreamPlanNodeId().getId(),
-              context.instanceContext);
+          isSameNode(downStreamEndPoint)
+              ? DATA_BLOCK_MANAGER.createLocalSinkHandle(
+                  localInstanceId.toThrift(),
+                  targetInstanceId.toThrift(),
+                  node.getDownStreamPlanNodeId().getId(),
+                  context.instanceContext)
+              : DATA_BLOCK_MANAGER.createSinkHandle(
+                  localInstanceId.toThrift(),
+                  downStreamEndPoint,
+                  targetInstanceId.toThrift(),
+                  node.getDownStreamPlanNodeId().getId(),
+                  context.instanceContext);
       context.setSinkHandle(sinkHandle);
       return child;
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandleTest.java
index 1fcdedc547..223b9385a7 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandleTest.java
@@ -72,7 +72,7 @@ public class LocalSourceHandleTest {
     Assert.assertTrue(localSourceHandle.isBlocked().isDone());
     localSourceHandle.receive();
     ListenableFuture<Void> blocked = localSourceHandle.isBlocked();
-    Assert.assertFalse(blocked.isDone());
+    Assert.assertTrue(blocked.isDone());
     Assert.assertFalse(localSourceHandle.isAborted());
     Assert.assertTrue(localSourceHandle.isFinished());
     Mockito.verify(mockSourceHandleListener, 
Mockito.times(1)).onFinished(localSourceHandle);

Reply via email to