This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/query_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/query_log by this
push:
new c2d89d0f7d make interrupt() more suitable
c2d89d0f7d is described below
commit c2d89d0f7d0f035f9d457beb5fa4e83a210e85d2
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Thu May 5 12:25:09 2022 +0800
make interrupt() more suitable
---
.../statemachine/SchemaRegionStateMachine.java | 2 +-
.../mpp/plan/execution/config/ConfigExecution.java | 8 ++-
.../db/mpp/plan/planner/plan/FragmentInstance.java | 6 +--
.../db/mpp/plan/scheduler/ClusterScheduler.java | 3 ++
.../scheduler/SimpleFragInstanceDispatcher.java | 61 ++++++++++++----------
5 files changed, 46 insertions(+), 34 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 68277cce8a..4fbe1c4a0a 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -78,7 +78,7 @@ public class SchemaRegionStateMachine extends
BaseStateMachine {
@Override
protected DataSet read(FragmentInstance fragmentInstance) {
- logger.info("Execute read plan in SchemaRegionStateMachine");
+ logger.info("SchemaRegionStateMachine[{}]: Execute read plan:
FragmentInstance-{}", schemaRegion.getSchemaRegionId(),
fragmentInstance.getId());
return
QUERY_INSTANCE_MANAGER.execSchemaQueryFragmentInstance(fragmentInstance,
schemaRegion);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index eccb8e19d6..65e9e317d6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -94,7 +94,9 @@ public class ConfigExecution implements IQueryExecution {
},
executor);
} catch (Throwable e) {
- Thread.currentThread().interrupt();
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
fail(e);
}
}
@@ -121,7 +123,9 @@ public class ConfigExecution implements IQueryExecution {
statusCode == TSStatusCode.SUCCESS_STATUS ? "" :
stateMachine.getFailureMessage();
return new ExecutionResult(context.getQueryId(),
RpcUtils.getStatus(statusCode, message));
} catch (InterruptedException | ExecutionException e) {
- Thread.currentThread().interrupt();
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
return new ExecutionResult(
context.getQueryId(),
RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR,
e.getMessage()));
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index dbaae2471e..f6011c4a85 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -116,12 +116,12 @@ public class FragmentInstance implements
IConsensusRequest {
ret.append(String.format("FragmentInstance-%s:", getId()));
ret.append(
String.format(
- "Host: %s", getHostDataNode() == null ? "Not set" :
getHostDataNode().dataNodeId));
+ "Host: %s ", getHostDataNode() == null ? "Not set" :
getHostDataNode().dataNodeId));
ret.append(
String.format(
- "Region: %s",
+ "Region: %s ",
getRegionReplicaSet() == null ? "Not set" :
getRegionReplicaSet().getRegionId()));
- ret.append("---- Plan Node Tree ----\n");
+ ret.append("\n---- Plan Node Tree ----\n");
ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
return ret.toString();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index db4bcfdb84..ced23a70ac 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -98,6 +98,7 @@ public class ClusterScheduler implements IScheduler {
try {
FragInstanceDispatchResult result = dispatchResultFuture.get();
if (!result.isSuccessful()) {
+ logger.error("{} dispatch failed.", getLogHeader());
stateMachine.transitionToFailed(new IllegalStateException("Fragment
cannot be dispatched"));
return;
}
@@ -119,6 +120,7 @@ public class ClusterScheduler implements IScheduler {
// The FragmentInstances has been dispatched successfully to corresponding
host, we mark the
// QueryState to Running
stateMachine.transitionToRunning();
+ logger.info("{} transit to RUNNING", getLogHeader());
instances.forEach(
instance -> {
stateMachine.initialFragInstanceState(instance.getId(),
FragmentInstanceState.RUNNING);
@@ -126,6 +128,7 @@ public class ClusterScheduler implements IScheduler {
// TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment
instance
this.stateTracker.start();
+ logger.info("{} state tracker starts", getLogHeader());
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index 4c64c3b7b6..780860e824 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,14 +19,20 @@
package org.apache.iotdb.db.mpp.plan.scheduler;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -49,36 +55,35 @@ public class SimpleFragInstanceDispatcher implements
IFragInstanceDispatcher {
public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
return executor.submit(
() -> {
- throw new RuntimeException("Dispatch Error");
- // TSendFragmentInstanceResp resp = new
TSendFragmentInstanceResp(false);
- // for (FragmentInstance instance : instances) {
- // TEndPoint endPoint =
instance.getHostDataNode().getInternalEndPoint();
- // // TODO: (jackie tien) change the port
- // try (SyncDataNodeInternalServiceClient client =
- //
internalServiceClientManager.borrowClient(endPoint)) {
- // // TODO: (xingtanzjr) consider how to handle the
buffer here
- // ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
- // instance.serializeRequest(buffer);
- // buffer.flip();
- // TConsensusGroupId groupId =
instance.getRegionReplicaSet().getRegionId();
- // TSendFragmentInstanceReq req =
- // new TSendFragmentInstanceReq(
- // new TFragmentInstance(buffer), groupId,
- // instance.getType().toString());
- // resp = client.sendFragmentInstance(req);
- // } catch (IOException e) {
- // LOGGER.error("can't connect to node {}", endPoint,
e);
- // throw e;
- // }
- //
- // if (!resp.accepted) {
- // break;
- // }
- // }
- // return new FragInstanceDispatchResult(resp.accepted);
+ TSendFragmentInstanceResp resp = new
TSendFragmentInstanceResp(false);
+ for (FragmentInstance instance : instances) {
+ TEndPoint endPoint =
instance.getHostDataNode().getInternalEndPoint();
+ // TODO: (jackie tien) change the port
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ // TODO: (xingtanzjr) consider how to handle the buffer here
+ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+ instance.serializeRequest(buffer);
+ buffer.flip();
+ TConsensusGroupId groupId =
instance.getRegionReplicaSet().getRegionId();
+ TSendFragmentInstanceReq req =
+ new TSendFragmentInstanceReq(
+ new TFragmentInstance(buffer), groupId,
+ instance.getType().toString());
+ resp = client.sendFragmentInstance(req);
+ } catch (IOException e) {
+ LOGGER.error("can't connect to node {}", endPoint, e);
+ throw e;
+ }
+ if (!resp.accepted) {
+ break;
+ }
+ }
+ return new FragInstanceDispatchResult(resp.accepted);
});
}
@Override
- public void abort() {}
+ public void abort() {
+ }
}