This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/mpp_issues
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/mpp_issues by this
push:
new a10332bc38 fix the issue that deadlock may occur in QueryTerminator
a10332bc38 is described below
commit a10332bc38c63e330ddd88d909740c101cf02458
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue Apr 19 16:35:56 2022 +0800
fix the issue that deadlock may occur in QueryTerminator
---
.../apache/iotdb/db/mpp/execution/Coordinator.java | 4 +-
.../iotdb/db/mpp/execution/QueryExecution.java | 3 ++
.../mpp/execution/scheduler/ClusterScheduler.java | 4 +-
.../mpp/execution/scheduler/IQueryTerminator.java | 4 +-
.../execution/scheduler/SimpleQueryTerminator.java | 52 ++++++++++------------
5 files changed, 35 insertions(+), 32 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index ba2c602855..c9393c6be7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -48,9 +48,9 @@ public class Coordinator {
private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
- private static final int COORDINATOR_EXECUTOR_SIZE = 20;
+ private static final int COORDINATOR_EXECUTOR_SIZE = 1;
private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME =
"MPPCoordinatorScheduled";
- private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 2;
+ private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
private static final Endpoint LOCAL_HOST =
new Endpoint(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 2b2ea1f762..0a1528422a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -261,13 +261,16 @@ public class QueryExecution implements IQueryExecution {
SettableFuture<QueryState> future = SettableFuture.create();
stateMachine.addStateChangeListener(
state -> {
+ LOG.info("[QueryExecution {}]: wait status callback invoked",
context.getQueryId());
if (state == QueryState.RUNNING || state.isDone()) {
future.set(state);
}
});
try {
+ LOG.info("[QueryExecution {}]: start to wait status",
context.getQueryId());
QueryState state = future.get();
+ LOG.info("[QueryExecution {}]: status got", context.getQueryId());
// TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't
FINISHED
TSStatusCode statusCode =
// For WRITE, the state should be FINISHED; For READ, the state
could be RUNNING
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 694a82738f..1f3ddd87a3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -95,7 +95,7 @@ public class ClusterScheduler implements IScheduler {
try {
LOGGER.info("[{}] wait dispatch to be finished",
queryContext.getQueryId());
FragInstanceDispatchResult result = dispatchResultFuture.get();
- LOGGER.info("[{}] dispatch finished: ", result.isSuccessful());
+ LOGGER.info("[{}] dispatch finished: {}", queryContext.getQueryId(),
result.isSuccessful());
if (!result.isSuccessful()) {
stateMachine.transitionToFailed(new IllegalStateException("Fragment
cannot be dispatched"));
return;
@@ -108,7 +108,9 @@ public class ClusterScheduler implements IScheduler {
// For the FragmentInstance of WRITE, it will be executed directly when
dispatching.
if (queryType == QueryType.WRITE) {
+ LOGGER.info("[{}] prepare to transition WRITE to finished",
queryContext.getQueryId());
stateMachine.transitionToFinished();
+ LOGGER.info("[{}] transition done", queryContext.getQueryId());
return;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
index 25ff71363b..29a1e27cfd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IQueryTerminator.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.execution.scheduler;
+import java.util.concurrent.Future;
+
public interface IQueryTerminator {
- boolean terminate();
+ Future<Boolean> terminate();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 728966daea..236a118449 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -27,15 +27,16 @@ import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
public class SimpleQueryTerminator implements IQueryTerminator {
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleQueryTerminator.class);
private final ExecutorService executor;
private final QueryId queryId;
private final List<FragmentInstance> fragmentInstances;
@@ -48,33 +49,28 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
}
@Override
- public boolean terminate() {
+ public Future<Boolean> terminate() {
+ LOGGER.info("[{}] start to submit terminate command", queryId);
List<Endpoint> relatedHost = getRelatedHost(fragmentInstances);
- Future<Boolean> future =
- executor.submit(
- () -> {
- try {
- for (Endpoint endpoint : relatedHost) {
- // TODO (jackie tien) change the port
- InternalService.Iface client =
- InternalServiceClientFactory.getInternalServiceClient(
- new Endpoint(
- endpoint.getIp(),
-
IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
- client.cancelQuery(new TCancelQueryReq(queryId.getId()));
- }
- } catch (TException e) {
- return false;
- }
- return true;
- });
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO: (xingtanzjr) Record the error info with logger
- Thread.currentThread().interrupt();
- return false;
- }
+
+ return executor.submit(
+ () -> {
+ try {
+ for (Endpoint endpoint : relatedHost) {
+ // TODO (jackie tien) change the port
+ InternalService.Iface client =
+ InternalServiceClientFactory.getInternalServiceClient(
+ new Endpoint(
+ endpoint.getIp(),
+
IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+ client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+ LOGGER.info("[{}] cancel query from DataNode[{}]", queryId,
endpoint.getIp());
+ }
+ } catch (TException e) {
+ return false;
+ }
+ return true;
+ });
}
private List<Endpoint> getRelatedHost(List<FragmentInstance> instances) {