This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch AddLog in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9b777bc9d9ff1d0653a042055e65b6966a232df5 Author: JackieTien97 <[email protected]> AuthorDate: Sun Jun 19 20:29:31 2022 +0800 [IOTDB-3365] Add more log name in query processing and Fix NPE while fetchFragmentInstanceState --- .../execution/fragment/FragmentInstanceState.java | 4 +- .../scheduler/AbstractFragInsStateTracker.java | 1 - .../scheduler/FixedRateFragInsStateTracker.java | 5 +- .../mpp/plan/scheduler/SimpleQueryTerminator.java | 1 - .../service/thrift/impl/InternalServiceImpl.java | 57 +++++++++++++--------- 5 files changed, 39 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java index ab1b500215..c8f94f143a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java @@ -44,7 +44,9 @@ public enum FragmentInstanceState { /** Instance was aborted due to a failure in the query. The failure was not in this instance. */ ABORTED(true, true), /** Instance execution failed. */ - FAILED(true, true); + FAILED(true, true), + /** Instance is not found. */ + NO_SUCH_INSTANCE(false, false); public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES = Stream.of(FragmentInstanceState.values()) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java index 424b58eee1..1be7c12ab0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java @@ -65,7 +65,6 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException, IOException { - // TODO: (jackie tien) change the port TEndPoint endPoint = instance.getHostDataNode().internalEndPoint; try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java index 755b628e91..6388e35e18 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; +import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,9 +76,9 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { private void fetchStateAndUpdate() { for (FragmentInstance instance : instances) { - try { + try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { FragmentInstanceState state = fetchState(instance); - logger.info("Instance {}'s State is {}", instance.getId(), state); + logger.info("State is {}", state); if (state != null) { stateMachine.updateFragInstanceState(instance.getId(), state); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java index 357c26fcf5..9b65092a71 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java @@ -69,7 +69,6 @@ public class SimpleQueryTerminator implements IQueryTerminator { return executor.submit( () -> { for (TEndPoint endPoint : relatedHost) { - // TODO (jackie tien) change the port try (SyncDataNodeInternalServiceClient client = internalServiceClientManager.borrowClient(endPoint)) { client.cancelQuery( diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java index 17efadbf68..0bbcbaa6e2 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InternalServiceImpl.java @@ -52,6 +52,7 @@ import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo; import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager; +import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator; import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance; @@ -89,6 +90,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.exception.NotImplementedException; +import io.airlift.concurrent.SetThreadName; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +106,7 @@ public class InternalServiceImpl implements InternalService.Iface { private static final Logger LOGGER = LoggerFactory.getLogger(InternalServiceImpl.class); private final SchemaEngine schemaEngine = SchemaEngine.getInstance(); private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance(); - private final double loadBalanceThreshold = 0.1; + private static final double loadBalanceThreshold = 0.1; public InternalServiceImpl() { super(); @@ -187,41 +189,48 @@ public class InternalServiceImpl implements InternalService.Iface { @Override public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) { - FragmentInstanceInfo info = - FragmentInstanceManager.getInstance() - .getInstanceInfo(FragmentInstanceId.fromThrift(req.fragmentInstanceId)); - return new TFragmentInstanceStateResp(info.getState().toString()); + FragmentInstanceId instanceId = FragmentInstanceId.fromThrift(req.fragmentInstanceId); + try (SetThreadName threadName = new SetThreadName(instanceId.getFullId())) { + FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId); + return info != null + ? new TFragmentInstanceStateResp(info.getState().toString()) + : new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString()); + } } @Override - public TCancelResp cancelQuery(TCancelQueryReq req) throws TException { - List<FragmentInstanceId> taskIds = - req.getFragmentInstanceIds().stream() - .map(FragmentInstanceId::fromThrift) - .collect(Collectors.toList()); - for (FragmentInstanceId taskId : taskIds) { - FragmentInstanceManager.getInstance().cancelTask(taskId); + public TCancelResp cancelQuery(TCancelQueryReq req) { + try (SetThreadName threadName = new SetThreadName(req.getQueryId())) { + LOGGER.info("start cancelling query."); + List<FragmentInstanceId> taskIds = + req.getFragmentInstanceIds().stream() + .map(FragmentInstanceId::fromThrift) + .collect(Collectors.toList()); + for (FragmentInstanceId taskId : taskIds) { + FragmentInstanceManager.getInstance().cancelTask(taskId); + } + LOGGER.info("finish cancelling query."); + return new TCancelResp(true); } - return new TCancelResp(true); } @Override - public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) throws TException { + public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) { throw new NotImplementedException(); } @Override - public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) throws TException { + public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) { throw new NotImplementedException(); } @Override - public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) throws TException { + public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) { throw new UnsupportedOperationException(); } @Override - public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) throws TException { + public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) { TSStatus tsStatus; try { PartialPath storageGroupPartitionPath = new PartialPath(req.getStorageGroup()); @@ -260,7 +269,7 @@ public class InternalServiceImpl implements InternalService.Iface { } @Override - public TSStatus createDataRegion(TCreateDataRegionReq req) throws TException { + public TSStatus createDataRegion(TCreateDataRegionReq req) { TSStatus tsStatus; try { TRegionReplicaSet regionReplicaSet = req.getRegionReplicaSet(); @@ -292,19 +301,19 @@ public class InternalServiceImpl implements InternalService.Iface { } @Override - public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) throws TException { + public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) { ClusterPartitionFetcher.getInstance().invalidAllCache(); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @Override - public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) throws TException { + public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) { DataNodeSchemaCache.getInstance().cleanUp(); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @Override - public THeartbeatResp getHeartBeat(THeartbeatReq req) throws TException { + public THeartbeatResp getHeartBeat(THeartbeatReq req) { THeartbeatResp resp = new THeartbeatResp(req.getHeartbeatTimestamp()); Random whetherToGetMetric = new Random(); if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric() @@ -356,7 +365,7 @@ public class InternalServiceImpl implements InternalService.Iface { } @Override - public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) throws TException { + public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) { if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(), req.getRoleName())) { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } @@ -369,7 +378,7 @@ public class InternalServiceImpl implements InternalService.Iface { } @Override - public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) throws TException { + public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) { ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId); if (consensusGroupId instanceof DataRegionId) { @@ -400,7 +409,7 @@ public class InternalServiceImpl implements InternalService.Iface { } @Override - public TMigrateRegionResp migrateRegion(TMigrateRegionReq req) throws TException { + public TMigrateRegionResp migrateRegion(TMigrateRegionReq req) { TRegionReplicaSet regionReplicaSet = req.migrateRegion; TSStatus tsStatus; ConsensusGenericResponse consensusGenericResponse;
