This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 38473d1a00 Fix the issue that sometimes the FragmentInstance may stuck
in FLUSHING states (#5808)
38473d1a00 is described below
commit 38473d1a0077b59300c3bc552b744ae721309bdf
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Fri May 6 12:45:42 2022 +0800
Fix the issue that sometimes the FragmentInstance may stuck in FLUSHING
states (#5808)
---
.../execution/datatransfer/DataBlockManager.java | 19 ++++++++-------
.../mpp/execution/datatransfer/SourceHandle.java | 4 ++--
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 4 ++--
.../db/mpp/plan/execution/QueryExecution.java | 4 +++-
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 28 ++++++++++++++++------
.../service/thrift/impl/InternalServiceImpl.java | 12 ++++++----
thrift/src/main/thrift/mpp.thrift | 1 +
7 files changed, 48 insertions(+), 24 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
index c889f18f6d..8cfda3c40b 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/DataBlockManager.java
@@ -106,10 +106,10 @@ public class DataBlockManager implements
IDataBlockManager {
e.getEndSequenceId(),
e.getSourceFragmentInstanceId());
if (!sinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
- throw new TException(
- "Source fragment instance not found. Fragment instance ID: "
- + e.getSourceFragmentInstanceId()
- + ".");
+ logger.warn(
+ "received ACK event but target FragmentInstance[{}] is not found.",
+ e.getSourceFragmentInstanceId());
+ return;
}
((SinkHandle) sinkHandles.get(e.getSourceFragmentInstanceId()))
.acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
@@ -130,10 +130,13 @@ public class DataBlockManager implements
IDataBlockManager {
.get(e.getTargetFragmentInstanceId())
.get(e.getTargetPlanNodeId())
.isAborted()) {
- throw new TException(
- "Target fragment instance not found. Fragment instance ID: "
- + e.getTargetFragmentInstanceId()
- + ".");
+ // In some scenario, when the SourceHandle sends the data block ACK
event, its upstream may
+ // have already been stopped. For example, in the query whit
LimitOperator, the downstream
+ // FragmentInstance may be finished, although the upstream is still
working.
+ logger.warn(
+ "received NewDataBlockEvent but the upstream FragmentInstance[{}]
is not found",
+ e.getTargetFragmentInstanceId());
+ return;
}
SourceHandle sourceHandle =
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
index 1c8bf70880..2102e98bfb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/datatransfer/SourceHandle.java
@@ -320,6 +320,8 @@ public class SourceHandle implements ISourceHandle {
TsBlock tsBlock = serde.deserialize(byteBuffer);
tsBlocks.add(tsBlock);
}
+ executorService.submit(
+ new SendAcknowledgeDataBlockEventTask(startSequenceId,
endSequenceId));
synchronized (SourceHandle.this) {
if (aborted) {
return;
@@ -331,8 +333,6 @@ public class SourceHandle implements ISourceHandle {
blocked.set(null);
}
}
- executorService.submit(
- new SendAcknowledgeDataBlockEventTask(startSequenceId,
endSequenceId));
break;
} catch (Throwable e) {
logger.error(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 08d8257560..275501eaf7 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -107,6 +107,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
}
SchemaTree result = new SchemaTree();
while (coordinator.getQueryExecution(queryId).hasNextResult()) {
+ // The query will be transited to FINISHED when invoking
getBatchResult() at the last time
+ // So we don't need to clean up it manually
Optional<TsBlock> tsBlock =
coordinator.getQueryExecution(queryId).getBatchResult();
if (!tsBlock.isPresent()) {
break;
@@ -121,8 +123,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
result.mergeSchemaTree(fetchedSchemaTree);
}
}
- // TODO: (xingtanzjr) need to release this query's resource here. This is
a temporary way
- coordinator.getQueryExecution(queryId).stopAndCleanup();
return result;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 763ca26a82..7f6078087f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -238,7 +238,9 @@ public class QueryExecution implements IQueryExecution {
ListenableFuture<Void> blocked = resultHandle.isBlocked();
blocked.get();
if (resultHandle.isFinished()) {
- releaseResource();
+ // Once the resultHandle is finished, we should transit the state of
this query to FINISHED.
+ // So that the corresponding cleanup work could be triggered.
+ stateMachine.transitionToFinished();
return Optional.empty();
}
return Optional.of(resultHandle.receive());
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index df6f08f7c2..357c26fcf5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -37,7 +38,8 @@ import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class SimpleQueryTerminator implements IQueryTerminator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleQueryTerminator.class);
+ private static final Logger logger =
LoggerFactory.getLogger(SimpleQueryTerminator.class);
+ private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L;
private final ExecutorService executor;
private final QueryId queryId;
private final List<FragmentInstance> fragmentInstances;
@@ -58,17 +60,22 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
@Override
public Future<Boolean> terminate() {
- List<TEndPoint> relatedHost = getRelatedHost(fragmentInstances);
-
+ List<TEndPoint> relatedHost = getRelatedHost();
+ try {
+ Thread.sleep(TERMINATION_GRACE_PERIOD_IN_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
return executor.submit(
() -> {
for (TEndPoint endPoint : relatedHost) {
// TODO (jackie tien) change the port
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+ client.cancelQuery(
+ new TCancelQueryReq(queryId.getId(),
getRelatedFragmentInstances(endPoint)));
} catch (IOException e) {
- LOGGER.error("can't connect to node {}", endPoint, e);
+ logger.error("can't connect to node {}", endPoint, e);
return false;
} catch (TException e) {
return false;
@@ -78,10 +85,17 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
});
}
- private List<TEndPoint> getRelatedHost(List<FragmentInstance> instances) {
- return instances.stream()
+ private List<TEndPoint> getRelatedHost() {
+ return fragmentInstances.stream()
.map(instance -> instance.getHostDataNode().internalEndPoint)
.distinct()
.collect(Collectors.toList());
}
+
+ private List<TFragmentInstanceId> getRelatedFragmentInstances(TEndPoint
endPoint) {
+ return fragmentInstances.stream()
+ .filter(instance ->
instance.getHostDataNode().internalEndPoint.equals(endPoint))
+ .map(instance -> instance.getId().toThrift())
+ .collect(Collectors.toList());
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index f8b5778611..e2a1fff0c3 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -72,6 +72,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class InternalServiceImpl implements InternalService.Iface {
@@ -132,11 +133,14 @@ public class InternalServiceImpl implements
InternalService.Iface {
@Override
public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-
- // TODO need to be implemented and currently in order not to print
NotImplementedException log,
- // we simply return null
+ List<FragmentInstanceId> taskIds =
+ req.getFragmentInstanceIds().stream()
+ .map(FragmentInstanceId::fromThrift)
+ .collect(Collectors.toList());
+ for (FragmentInstanceId taskId : taskIds) {
+ FragmentInstanceManager.getInstance().cancelTask(taskId);
+ }
return new TCancelResp(true);
- // throw new NotImplementedException();
}
@Override
diff --git a/thrift/src/main/thrift/mpp.thrift
b/thrift/src/main/thrift/mpp.thrift
index 0c2acf6cb2..1d33d58cc4 100644
--- a/thrift/src/main/thrift/mpp.thrift
+++ b/thrift/src/main/thrift/mpp.thrift
@@ -105,6 +105,7 @@ struct TFragmentInstanceStateResp {
struct TCancelQueryReq {
1: required string queryId
+ 2: required list<TFragmentInstanceId> fragmentInstanceIds
}
struct TCancelPlanFragmentReq {