This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 05a849184e [IOTDB-3365] Add more log name in query processing and Fix
NPE while fetchFragmentInstanceState (#6332)
05a849184e is described below
commit 05a849184efbd9740c6fe20bfd3b149e9cd4b1d0
Author: Jackie Tien <[email protected]>
AuthorDate: Sun Jun 19 21:49:47 2022 +0800
[IOTDB-3365] Add more log name in query processing and Fix NPE while
fetchFragmentInstanceState (#6332)
---
.../execution/fragment/FragmentInstanceState.java | 4 +-
.../scheduler/AbstractFragInsStateTracker.java | 1 -
.../scheduler/FixedRateFragInsStateTracker.java | 5 +-
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 1 -
.../service/thrift/impl/InternalServiceImpl.java | 57 +++++++++++++---------
5 files changed, 39 insertions(+), 29 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
index ab1b500215..c8f94f143a 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
@@ -44,7 +44,9 @@ public enum FragmentInstanceState {
/** Instance was aborted due to a failure in the query. The failure was not
in this instance. */
ABORTED(true, true),
/** Instance execution failed. */
- FAILED(true, true);
+ FAILED(true, true),
+ /** Instance is not found. */
+ NO_SUCH_INSTANCE(false, false);
public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
Stream.of(FragmentInstanceState.values())
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 424b58eee1..1be7c12ab0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -65,7 +65,6 @@ public abstract class AbstractFragInsStateTracker implements
IFragInstanceStateT
protected FragmentInstanceState fetchState(FragmentInstance instance)
throws TException, IOException {
- // TODO: (jackie tien) change the port
TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 755b628e91..6388e35e18 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import io.airlift.concurrent.SetThreadName;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,9 +76,9 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
private void fetchStateAndUpdate() {
for (FragmentInstance instance : instances) {
- try {
+ try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
FragmentInstanceState state = fetchState(instance);
- logger.info("Instance {}'s State is {}", instance.getId(), state);
+ logger.info("State is {}", state);
if (state != null) {
stateMachine.updateFragInstanceState(instance.getId(), state);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 357c26fcf5..9b65092a71 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -69,7 +69,6 @@ public class SimpleQueryTerminator implements
IQueryTerminator {
return executor.submit(
() -> {
for (TEndPoint endPoint : relatedHost) {
- // TODO (jackie tien) change the port
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
client.cancelQuery(
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 17efadbf68..0bbcbaa6e2 100644
---
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -89,6 +90,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
+import io.airlift.concurrent.SetThreadName;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,7 +106,7 @@ public class InternalServiceImpl implements
InternalService.Iface {
private static final Logger LOGGER =
LoggerFactory.getLogger(InternalServiceImpl.class);
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
- private final double loadBalanceThreshold = 0.1;
+ private static final double loadBalanceThreshold = 0.1;
public InternalServiceImpl() {
super();
@@ -187,41 +189,48 @@ public class InternalServiceImpl implements
InternalService.Iface {
@Override
public TFragmentInstanceStateResp
fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
- FragmentInstanceInfo info =
- FragmentInstanceManager.getInstance()
-
.getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
- return new TFragmentInstanceStateResp(info.getState().toString());
+ FragmentInstanceId instanceId =
FragmentInstanceId.fromThrift(req.fragmentInstanceId);
+ try (SetThreadName threadName = new SetThreadName(instanceId.getFullId()))
{
+ FragmentInstanceInfo info =
FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
+ return info != null
+ ? new TFragmentInstanceStateResp(info.getState().toString())
+ : new
TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
+ }
}
@Override
- public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
- List<FragmentInstanceId> taskIds =
- req.getFragmentInstanceIds().stream()
- .map(FragmentInstanceId::fromThrift)
- .collect(Collectors.toList());
- for (FragmentInstanceId taskId : taskIds) {
- FragmentInstanceManager.getInstance().cancelTask(taskId);
+ public TCancelResp cancelQuery(TCancelQueryReq req) {
+ try (SetThreadName threadName = new SetThreadName(req.getQueryId())) {
+ LOGGER.info("start cancelling query.");
+ List<FragmentInstanceId> taskIds =
+ req.getFragmentInstanceIds().stream()
+ .map(FragmentInstanceId::fromThrift)
+ .collect(Collectors.toList());
+ for (FragmentInstanceId taskId : taskIds) {
+ FragmentInstanceManager.getInstance().cancelTask(taskId);
+ }
+ LOGGER.info("finish cancelling query.");
+ return new TCancelResp(true);
}
- return new TCancelResp(true);
}
@Override
- public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) throws
TException {
+ public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) {
throw new NotImplementedException();
}
@Override
- public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req)
throws TException {
+ public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) {
throw new NotImplementedException();
}
@Override
- public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) throws
TException {
+ public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) {
throw new UnsupportedOperationException();
}
@Override
- public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) throws
TException {
+ public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) {
TSStatus tsStatus;
try {
PartialPath storageGroupPartitionPath = new
PartialPath(req.getStorageGroup());
@@ -260,7 +269,7 @@ public class InternalServiceImpl implements
InternalService.Iface {
}
@Override
- public TSStatus createDataRegion(TCreateDataRegionReq req) throws TException
{
+ public TSStatus createDataRegion(TCreateDataRegionReq req) {
TSStatus tsStatus;
try {
TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
@@ -292,19 +301,19 @@ public class InternalServiceImpl implements
InternalService.Iface {
}
@Override
- public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) throws
TException {
+ public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
ClusterPartitionFetcher.getInstance().invalidAllCache();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
- public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) throws
TException {
+ public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
DataNodeSchemaCache.getInstance().cleanUp();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
- public THeartbeatResp getHeartBeat(THeartbeatReq req) throws TException {
+ public THeartbeatResp getHeartBeat(THeartbeatReq req) {
THeartbeatResp resp = new THeartbeatResp(req.getHeartbeatTimestamp());
Random whetherToGetMetric = new Random();
if
(MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()
@@ -356,7 +365,7 @@ public class InternalServiceImpl implements
InternalService.Iface {
}
@Override
- public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req)
throws TException {
+ public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req)
{
if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(),
req.getRoleName())) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@@ -369,7 +378,7 @@ public class InternalServiceImpl implements
InternalService.Iface {
}
@Override
- public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) throws
TException {
+ public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
if (consensusGroupId instanceof DataRegionId) {
@@ -400,7 +409,7 @@ public class InternalServiceImpl implements
InternalService.Iface {
}
@Override
- public TMigrateRegionResp migrateRegion(TMigrateRegionReq req) throws
TException {
+ public TMigrateRegionResp migrateRegion(TMigrateRegionReq req) {
TRegionReplicaSet regionReplicaSet = req.migrateRegion;
TSStatus tsStatus;
ConsensusGenericResponse consensusGenericResponse;