This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 51acf53822 Support LocalSinkHandle and LocalSourceHandle if two FI in
the same DataNode (#6193)
51acf53822 is described below
commit 51acf53822f6e2586681851429ffbff6fed3e90d
Author: Jackie Tien <[email protected]>
AuthorDate: Mon Jun 13 15:19:59 2022 +0800
Support LocalSinkHandle and LocalSourceHandle if two FI in the same
DataNode (#6193)
---
.../execution/datatransfer/SharedTsBlockQueue.java | 2 +-
.../fragment/FragmentInstanceExecution.java | 25 +++++++------
.../fragment/FragmentInstanceStateMachine.java | 7 +++-
.../db/mpp/plan/constant/DataNodeEndPoints.java | 4 +++
.../db/mpp/plan/execution/QueryExecution.java | 32 +++++++++++------
.../db/mpp/plan/planner/LocalExecutionPlanner.java | 42 +++++++++++++++-------
.../datatransfer/LocalSourceHandleTest.java | 2 +-
7 files changed, 77 insertions(+), 37 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
index e81f3dcbc5..3d991b1aa9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SharedTsBlockQueue.java
@@ -100,7 +100,7 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(localFragmentInstanceId.getQueryId(),
tsBlock.getRetainedSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
- if (blocked.isDone() && queue.isEmpty()) {
+ if (blocked.isDone() && queue.isEmpty() && !noMoreTsBlocks) {
blocked = SettableFuture.create();
}
return tsBlock;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 15da58fda1..bf4a119399 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.execution.driver.IDriver;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
import com.google.common.collect.ImmutableList;
+import io.airlift.concurrent.SetThreadName;
import io.airlift.stats.CounterStat;
import static java.util.Objects.requireNonNull;
@@ -102,18 +103,20 @@ public class FragmentInstanceExecution {
requireNonNull(failedInstances, "failedInstances is null");
stateMachine.addStateChangeListener(
newState -> {
- if (!newState.isDone()) {
- return;
+ try (SetThreadName threadName = new
SetThreadName(instanceId.getFullId())) {
+ if (!newState.isDone()) {
+ return;
+ }
+
+ // Update failed tasks counter
+ if (newState == FAILED) {
+ failedInstances.update(1);
+ }
+
+ driver.close();
+ sinkHandle.abort();
+ scheduler.abortFragmentInstance(instanceId);
}
-
- // Update failed tasks counter
- if (newState == FAILED) {
- failedInstances.update(1);
- }
-
- driver.close();
- sinkHandle.abort();
- scheduler.abortFragmentInstance(instanceId);
});
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
index e908120dd6..699777d8f0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.concurrent.SetThreadName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +76,11 @@ public class FragmentInstanceStateMachine {
new StateMachine<>(
"FragmentInstance " + fragmentInstanceId, executor, RUNNING,
TERMINAL_INSTANCE_STATES);
instanceState.addStateChangeListener(
- newState -> LOGGER.debug("Fragment Instance {} is {}",
fragmentInstanceId, newState));
+ newState -> {
+ try (SetThreadName threadName = new
SetThreadName(fragmentInstanceId.getFullId())) {
+ LOGGER.info("State transfer to {}", newState);
+ }
+ });
}
public long getCreatedTime() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/DataNodeEndPoints.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/DataNodeEndPoints.java
index 595dcdec90..15225eb3c9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/DataNodeEndPoints.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/constant/DataNodeEndPoints.java
@@ -32,4 +32,8 @@ public class DataNodeEndPoints {
new TEndPoint(
IoTDBDescriptor.getInstance().getConfig().getInternalIp(),
IoTDBDescriptor.getInstance().getConfig().getInternalPort());
+
+ public static boolean isSameNode(TEndPoint endPoint) {
+ return endPoint.equals(LOCAL_HOST_DATA_BLOCK_ENDPOINT);
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 224cbacd1a..9453ce73a1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -74,6 +74,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import static com.google.common.base.Throwables.throwIfUnchecked;
+import static
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
/**
* QueryExecution stores all the status of a query which is being prepared or
running inside the MPP
@@ -292,8 +293,9 @@ public class QueryExecution implements IQueryExecution {
}
} catch (ExecutionException | CancellationException e) {
stateMachine.transitionToFailed(e);
- throwIfUnchecked(e.getCause());
- throw new RuntimeException(e.getCause());
+ Throwable t = e.getCause() == null ? e : e.getCause();
+ throwIfUnchecked(t);
+ throw new RuntimeException(t);
} catch (InterruptedException e) {
stateMachine.transitionToFailed(e);
Thread.currentThread().interrupt();
@@ -352,15 +354,25 @@ public class QueryExecution implements IQueryExecution {
private void initResultHandle() {
if (this.resultHandle == null) {
+ TEndPoint upstreamEndPoint =
context.getResultNodeContext().getUpStreamEndpoint();
+
this.resultHandle =
- DataBlockService.getInstance()
- .getDataBlockManager()
- .createSourceHandle(
-
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-
context.getResultNodeContext().getVirtualResultNodeId().getId(),
- context.getResultNodeContext().getUpStreamEndpoint(),
-
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
- stateMachine::transitionToFailed);
+ isSameNode(upstreamEndPoint)
+ ? DataBlockService.getInstance()
+ .getDataBlockManager()
+ .createLocalSourceHandle(
+
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+ stateMachine::transitionToFailed)
+ : DataBlockService.getInstance()
+ .getDataBlockManager()
+ .createSourceHandle(
+
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
+
context.getResultNodeContext().getVirtualResultNodeId().getId(),
+ upstreamEndPoint,
+
context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift(),
+ stateMachine::transitionToFailed);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index e5b01ff8a7..17406785cd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.plan.planner;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -178,6 +179,7 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.db.mpp.execution.operator.LastQueryUtil.satisfyFilter;
+import static
org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints.isSameNode;
/**
* Used to plan a fragment instance. Currently, we simply change it from
PlanNode to executable
@@ -1050,17 +1052,24 @@ public class LocalExecutionPlanner {
context.instanceContext.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- SeriesScanOperator.class.getSimpleName());
+ ExchangeOperator.class.getSimpleName());
FragmentInstanceId localInstanceId = context.instanceContext.getId();
FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
+ TEndPoint upstreamEndPoint = node.getUpstreamEndpoint();
ISourceHandle sourceHandle =
- DATA_BLOCK_MANAGER.createSourceHandle(
- localInstanceId.toThrift(),
- node.getPlanNodeId().getId(),
- node.getUpstreamEndpoint(),
- remoteInstanceId.toThrift(),
- context.instanceContext::failed);
+ isSameNode(upstreamEndPoint)
+ ? DATA_BLOCK_MANAGER.createLocalSourceHandle(
+ localInstanceId.toThrift(),
+ node.getPlanNodeId().getId(),
+ remoteInstanceId.toThrift(),
+ context.instanceContext::failed)
+ : DATA_BLOCK_MANAGER.createSourceHandle(
+ localInstanceId.toThrift(),
+ node.getPlanNodeId().getId(),
+ upstreamEndPoint,
+ remoteInstanceId.toThrift(),
+ context.instanceContext::failed);
return new ExchangeOperator(operatorContext, sourceHandle,
node.getUpstreamPlanNodeId());
}
@@ -1070,16 +1079,23 @@ public class LocalExecutionPlanner {
FragmentInstanceId localInstanceId = context.instanceContext.getId();
FragmentInstanceId targetInstanceId = node.getDownStreamInstanceId();
+ TEndPoint downStreamEndPoint = node.getDownStreamEndpoint();
checkArgument(DATA_BLOCK_MANAGER != null, "DATA_BLOCK_MANAGER should not
be null");
ISinkHandle sinkHandle =
- DATA_BLOCK_MANAGER.createSinkHandle(
- localInstanceId.toThrift(),
- node.getDownStreamEndpoint(),
- targetInstanceId.toThrift(),
- node.getDownStreamPlanNodeId().getId(),
- context.instanceContext);
+ isSameNode(downStreamEndPoint)
+ ? DATA_BLOCK_MANAGER.createLocalSinkHandle(
+ localInstanceId.toThrift(),
+ targetInstanceId.toThrift(),
+ node.getDownStreamPlanNodeId().getId(),
+ context.instanceContext)
+ : DATA_BLOCK_MANAGER.createSinkHandle(
+ localInstanceId.toThrift(),
+ downStreamEndPoint,
+ targetInstanceId.toThrift(),
+ node.getDownStreamPlanNodeId().getId(),
+ context.instanceContext);
context.setSinkHandle(sinkHandle);
return child;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandleTest.java
index 1fcdedc547..223b9385a7 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/datatransfer/LocalSourceHandleTest.java
@@ -72,7 +72,7 @@ public class LocalSourceHandleTest {
Assert.assertTrue(localSourceHandle.isBlocked().isDone());
localSourceHandle.receive();
ListenableFuture<Void> blocked = localSourceHandle.isBlocked();
- Assert.assertFalse(blocked.isDone());
+ Assert.assertTrue(blocked.isDone());
Assert.assertFalse(localSourceHandle.isAborted());
Assert.assertTrue(localSourceHandle.isFinished());
Mockito.verify(mockSourceHandleListener,
Mockito.times(1)).onFinished(localSourceHandle);