This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch AddLog
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9b777bc9d9ff1d0653a042055e65b6966a232df5
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Jun 19 20:29:31 2022 +0800

    [IOTDB-3365] Add more log name in query processing and Fix NPE while 
fetchFragmentInstanceState
---
 .../execution/fragment/FragmentInstanceState.java  |  4 +-
 .../scheduler/AbstractFragInsStateTracker.java     |  1 -
 .../scheduler/FixedRateFragInsStateTracker.java    |  5 +-
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  |  1 -
 .../service/thrift/impl/InternalServiceImpl.java   | 57 +++++++++++++---------
 5 files changed, 39 insertions(+), 29 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
index ab1b500215..c8f94f143a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
@@ -44,7 +44,9 @@ public enum FragmentInstanceState {
   /** Instance was aborted due to a failure in the query. The failure was not 
in this instance. */
   ABORTED(true, true),
   /** Instance execution failed. */
-  FAILED(true, true);
+  FAILED(true, true),
+  /** Instance is not found. */
+  NO_SUCH_INSTANCE(false, false);
 
   public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
       Stream.of(FragmentInstanceState.values())
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 424b58eee1..1be7c12ab0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -65,7 +65,6 @@ public abstract class AbstractFragInsStateTracker implements 
IFragInstanceStateT
 
   protected FragmentInstanceState fetchState(FragmentInstance instance)
       throws TException, IOException {
-    // TODO: (jackie tien) change the port
     TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
     try (SyncDataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 755b628e91..6388e35e18 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 
+import io.airlift.concurrent.SetThreadName;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,9 +76,9 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
 
   private void fetchStateAndUpdate() {
     for (FragmentInstance instance : instances) {
-      try {
+      try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
         FragmentInstanceState state = fetchState(instance);
-        logger.info("Instance {}'s State is {}", instance.getId(), state);
+        logger.info("State is {}", state);
 
         if (state != null) {
           stateMachine.updateFragInstanceState(instance.getId(), state);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 357c26fcf5..9b65092a71 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -69,7 +69,6 @@ public class SimpleQueryTerminator implements 
IQueryTerminator {
     return executor.submit(
         () -> {
           for (TEndPoint endPoint : relatedHost) {
-            // TODO (jackie tien) change the port
             try (SyncDataNodeInternalServiceClient client =
                 internalServiceClientManager.borrowClient(endPoint)) {
               client.cancelQuery(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
index 17efadbf68..0bbcbaa6e2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -89,6 +90,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
+import io.airlift.concurrent.SetThreadName;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,7 +106,7 @@ public class InternalServiceImpl implements 
InternalService.Iface {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InternalServiceImpl.class);
   private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
   private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
-  private final double loadBalanceThreshold = 0.1;
+  private static final double loadBalanceThreshold = 0.1;
 
   public InternalServiceImpl() {
     super();
@@ -187,41 +189,48 @@ public class InternalServiceImpl implements 
InternalService.Iface {
 
   @Override
   public TFragmentInstanceStateResp 
fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
-    FragmentInstanceInfo info =
-        FragmentInstanceManager.getInstance()
-            
.getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId));
-    return new TFragmentInstanceStateResp(info.getState().toString());
+    FragmentInstanceId instanceId = 
FragmentInstanceId.fromThrift(req.fragmentInstanceId);
+    try (SetThreadName threadName = new SetThreadName(instanceId.getFullId())) 
{
+      FragmentInstanceInfo info = 
FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
+      return info != null
+          ? new TFragmentInstanceStateResp(info.getState().toString())
+          : new 
TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
+    }
   }
 
   @Override
-  public TCancelResp cancelQuery(TCancelQueryReq req) throws TException {
-    List<FragmentInstanceId> taskIds =
-        req.getFragmentInstanceIds().stream()
-            .map(FragmentInstanceId::fromThrift)
-            .collect(Collectors.toList());
-    for (FragmentInstanceId taskId : taskIds) {
-      FragmentInstanceManager.getInstance().cancelTask(taskId);
+  public TCancelResp cancelQuery(TCancelQueryReq req) {
+    try (SetThreadName threadName = new SetThreadName(req.getQueryId())) {
+      LOGGER.info("start cancelling query.");
+      List<FragmentInstanceId> taskIds =
+          req.getFragmentInstanceIds().stream()
+              .map(FragmentInstanceId::fromThrift)
+              .collect(Collectors.toList());
+      for (FragmentInstanceId taskId : taskIds) {
+        FragmentInstanceManager.getInstance().cancelTask(taskId);
+      }
+      LOGGER.info("finish cancelling query.");
+      return new TCancelResp(true);
     }
-    return new TCancelResp(true);
   }
 
   @Override
-  public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) throws 
TException {
+  public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) {
     throw new NotImplementedException();
   }
 
   @Override
-  public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) 
throws TException {
+  public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) {
     throw new NotImplementedException();
   }
 
   @Override
-  public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) throws 
TException {
+  public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) throws 
TException {
+  public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) {
     TSStatus tsStatus;
     try {
       PartialPath storageGroupPartitionPath = new 
PartialPath(req.getStorageGroup());
@@ -260,7 +269,7 @@ public class InternalServiceImpl implements 
InternalService.Iface {
   }
 
   @Override
-  public TSStatus createDataRegion(TCreateDataRegionReq req) throws TException 
{
+  public TSStatus createDataRegion(TCreateDataRegionReq req) {
     TSStatus tsStatus;
     try {
       TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet();
@@ -292,19 +301,19 @@ public class InternalServiceImpl implements 
InternalService.Iface {
   }
 
   @Override
-  public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) throws 
TException {
+  public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
     ClusterPartitionFetcher.getInstance().invalidAllCache();
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
-  public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) throws 
TException {
+  public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
     DataNodeSchemaCache.getInstance().cleanUp();
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
-  public THeartbeatResp getHeartBeat(THeartbeatReq req) throws TException {
+  public THeartbeatResp getHeartBeat(THeartbeatReq req) {
     THeartbeatResp resp = new THeartbeatResp(req.getHeartbeatTimestamp());
     Random whetherToGetMetric = new Random();
     if 
(MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()
@@ -356,7 +365,7 @@ public class InternalServiceImpl implements 
InternalService.Iface {
   }
 
   @Override
-  public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) 
throws TException {
+  public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) 
{
     if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(), 
req.getRoleName())) {
       return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
     }
@@ -369,7 +378,7 @@ public class InternalServiceImpl implements 
InternalService.Iface {
   }
 
   @Override
-  public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) throws 
TException {
+  public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
     ConsensusGroupId consensusGroupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
     if (consensusGroupId instanceof DataRegionId) {
@@ -400,7 +409,7 @@ public class InternalServiceImpl implements 
InternalService.Iface {
   }
 
   @Override
-  public TMigrateRegionResp migrateRegion(TMigrateRegionReq req) throws 
TException {
+  public TMigrateRegionResp migrateRegion(TMigrateRegionReq req) {
     TRegionReplicaSet regionReplicaSet = req.migrateRegion;
     TSStatus tsStatus;
     ConsensusGenericResponse consensusGenericResponse;

Reply via email to