This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/query_log by this 
push:
     new c2d89d0f7d make interrupt() more suitable
c2d89d0f7d is described below

commit c2d89d0f7d0f035f9d457beb5fa4e83a210e85d2
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Thu May 5 12:25:09 2022 +0800

    make interrupt() more suitable
---
 .../statemachine/SchemaRegionStateMachine.java     |  2 +-
 .../mpp/plan/execution/config/ConfigExecution.java |  8 ++-
 .../db/mpp/plan/planner/plan/FragmentInstance.java |  6 +--
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  3 ++
 .../scheduler/SimpleFragInstanceDispatcher.java    | 61 ++++++++++++----------
 5 files changed, 46 insertions(+), 34 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 68277cce8a..4fbe1c4a0a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -78,7 +78,7 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
 
   @Override
   protected DataSet read(FragmentInstance fragmentInstance) {
-    logger.info("Execute read plan in SchemaRegionStateMachine");
+    logger.info("SchemaRegionStateMachine[{}]: Execute read plan: 
FragmentInstance-{}", schemaRegion.getSchemaRegionId(), 
fragmentInstance.getId());
     return 
QUERY_INSTANCE_MANAGER.execSchemaQueryFragmentInstance(fragmentInstance, 
schemaRegion);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index eccb8e19d6..65e9e317d6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -94,7 +94,9 @@ public class ConfigExecution implements IQueryExecution {
           },
           executor);
     } catch (Throwable e) {
-      Thread.currentThread().interrupt();
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       fail(e);
     }
   }
@@ -121,7 +123,9 @@ public class ConfigExecution implements IQueryExecution {
           statusCode == TSStatusCode.SUCCESS_STATUS ? "" : 
stateMachine.getFailureMessage();
       return new ExecutionResult(context.getQueryId(), 
RpcUtils.getStatus(statusCode, message));
     } catch (InterruptedException | ExecutionException e) {
-      Thread.currentThread().interrupt();
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       return new ExecutionResult(
           context.getQueryId(),
           RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR, 
e.getMessage()));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
index dbaae2471e..f6011c4a85 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/FragmentInstance.java
@@ -116,12 +116,12 @@ public class FragmentInstance implements 
IConsensusRequest {
     ret.append(String.format("FragmentInstance-%s:", getId()));
     ret.append(
         String.format(
-            "Host: %s", getHostDataNode() == null ? "Not set" : 
getHostDataNode().dataNodeId));
+            "Host: %s ", getHostDataNode() == null ? "Not set" : 
getHostDataNode().dataNodeId));
     ret.append(
         String.format(
-            "Region: %s",
+            "Region: %s ",
             getRegionReplicaSet() == null ? "Not set" : 
getRegionReplicaSet().getRegionId()));
-    ret.append("---- Plan Node Tree ----\n");
+    ret.append("\n---- Plan Node Tree ----\n");
     ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
     return ret.toString();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index db4bcfdb84..ced23a70ac 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -98,6 +98,7 @@ public class ClusterScheduler implements IScheduler {
     try {
       FragInstanceDispatchResult result = dispatchResultFuture.get();
       if (!result.isSuccessful()) {
+        logger.error("{} dispatch failed.", getLogHeader());
         stateMachine.transitionToFailed(new IllegalStateException("Fragment 
cannot be dispatched"));
         return;
       }
@@ -119,6 +120,7 @@ public class ClusterScheduler implements IScheduler {
     // The FragmentInstances has been dispatched successfully to corresponding 
host, we mark the
     // QueryState to Running
     stateMachine.transitionToRunning();
+    logger.info("{} transit to RUNNING", getLogHeader());
     instances.forEach(
         instance -> {
           stateMachine.initialFragInstanceState(instance.getId(), 
FragmentInstanceState.RUNNING);
@@ -126,6 +128,7 @@ public class ClusterScheduler implements IScheduler {
 
     // TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment 
instance
     this.stateTracker.start();
+    logger.info("{} state tracker starts", getLogHeader());
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index 4c64c3b7b6..780860e824 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,14 +19,20 @@
 
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -49,36 +55,35 @@ public class SimpleFragInstanceDispatcher implements 
IFragInstanceDispatcher {
   public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> 
instances) {
     return executor.submit(
         () -> {
-          throw new RuntimeException("Dispatch Error");
-          //          TSendFragmentInstanceResp resp = new 
TSendFragmentInstanceResp(false);
-          //          for (FragmentInstance instance : instances) {
-          //            TEndPoint endPoint = 
instance.getHostDataNode().getInternalEndPoint();
-          //            // TODO: (jackie tien) change the port
-          //            try (SyncDataNodeInternalServiceClient client =
-          //                
internalServiceClientManager.borrowClient(endPoint)) {
-          //              // TODO: (xingtanzjr) consider how to handle the 
buffer here
-          //              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
-          //              instance.serializeRequest(buffer);
-          //              buffer.flip();
-          //              TConsensusGroupId groupId = 
instance.getRegionReplicaSet().getRegionId();
-          //              TSendFragmentInstanceReq req =
-          //                  new TSendFragmentInstanceReq(
-          //                      new TFragmentInstance(buffer), groupId,
-          // instance.getType().toString());
-          //              resp = client.sendFragmentInstance(req);
-          //            } catch (IOException e) {
-          //              LOGGER.error("can't connect to node {}", endPoint, 
e);
-          //              throw e;
-          //            }
-          //
-          //            if (!resp.accepted) {
-          //              break;
-          //            }
-          //          }
-          //          return new FragInstanceDispatchResult(resp.accepted);
+          TSendFragmentInstanceResp resp = new 
TSendFragmentInstanceResp(false);
+          for (FragmentInstance instance : instances) {
+            TEndPoint endPoint = 
instance.getHostDataNode().getInternalEndPoint();
+            // TODO: (jackie tien) change the port
+            try (SyncDataNodeInternalServiceClient client =
+                     internalServiceClientManager.borrowClient(endPoint)) {
+              // TODO: (xingtanzjr) consider how to handle the buffer here
+              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+              instance.serializeRequest(buffer);
+              buffer.flip();
+              TConsensusGroupId groupId = 
instance.getRegionReplicaSet().getRegionId();
+              TSendFragmentInstanceReq req =
+                  new TSendFragmentInstanceReq(
+                      new TFragmentInstance(buffer), groupId,
+                      instance.getType().toString());
+              resp = client.sendFragmentInstance(req);
+            } catch (IOException e) {
+              LOGGER.error("can't connect to node {}", endPoint, e);
+              throw e;
+            }
+            if (!resp.accepted) {
+              break;
+            }
+          }
+          return new FragInstanceDispatchResult(resp.accepted);
         });
   }
 
   @Override
-  public void abort() {}
+  public void abort() {
+  }
 }

Reply via email to