This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_opt_version1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 22ee02fe43b9deeb0c668e03a2e22024be4eed66
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Jun 20 22:58:35 2022 +0800

    fix the issue in queryTerminator
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  4 +-
 .../iotdb/db/mpp/execution/StateMachine.java       |  7 ++-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  2 +-
 .../org/apache/iotdb/db/mpp/plan/StepTracker.java  | 15 +++--
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 11 ++++
 .../db/mpp/plan/execution/QueryExecution.java      |  5 ++
 .../db/mpp/plan/scheduler/ClusterScheduler.java    | 30 ++++++----
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 65 +++++++++++++---------
 .../db/mpp/plan/scheduler/StandaloneScheduler.java |  6 +-
 9 files changed, 98 insertions(+), 47 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5de782e678..1ca1c2c794 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -928,10 +928,10 @@ public class IoTDBConfig {
   private int triggerForwardMQTTPoolSize = 4;
 
   /** ThreadPool size for read operation in coordinator */
-  private int coordinatorReadExecutorSize = 100;
+  private int coordinatorReadExecutorSize = 50;
 
   /** ThreadPool size for write operation in coordinator */
-  private int coordinatorWriteExecutorSize = 100;
+  private int coordinatorWriteExecutorSize = 50;
 
   IoTDBConfig() {}
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
index c1b7e76360..e53c2cb02d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -13,6 +13,8 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.mpp.plan.StepTracker;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -270,6 +272,7 @@ public class StateMachine<T> {
    * execution. The listener is notified immediately of the current state.
    */
   public void addStateChangeListener(StateChangeListener<T> 
stateChangeListener) {
+    long startTime = System.nanoTime();
     requireNonNull(stateChangeListener, "stateChangeListener is null");
 
     boolean inTerminalState;
@@ -281,7 +284,7 @@ public class StateMachine<T> {
         stateChangeListeners.add(stateChangeListener);
       }
     }
-
+    StepTracker.trace("addStateListener", startTime, System.nanoTime());
     // fire state change listener with the current state
     // always fire listener callbacks from a different thread
     safeExecute(() -> stateChangeListener.stateChanged(currentState));
@@ -310,6 +313,8 @@ public class StateMachine<T> {
 
   private void safeExecute(Runnable command) {
     try {
+      //      new Thread(command).start();
+      //      command.run();
       executor.execute(command);
     } catch (RejectedExecutionException e) {
       if ((executor instanceof ExecutorService) && ((ExecutorService) 
executor).isShutdown()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 495af1f3e0..f9a66b7ec1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -57,7 +57,7 @@ public class Coordinator {
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
   private static final String COORDINATOR_WRITE_EXECUTOR_NAME = 
"MPPCoordinatorWrite";
   private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = 
"MPPCoordinatorScheduled";
-  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 1;
+  private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
 
   private static final IClientManager<TEndPoint, 
SyncDataNodeInternalServiceClient>
       INTERNAL_SERVICE_CLIENT_MANAGER =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
index 1fe7df7be0..9d3da35f72 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
@@ -27,16 +27,18 @@ import java.util.Map;
 
 class Metric {
   private static final Logger logger = LoggerFactory.getLogger(Metric.class);
+  private static final int PRINT_RATE = 100;
+
   public String stepName;
   public long invokeCount;
   public long totalTime;
-  public long last100Time;
+  public long lastCycleTime;
 
   public Metric(String stepName) {
     this.stepName = stepName;
     this.invokeCount = 0;
     this.totalTime = 0;
-    this.last100Time = 0;
+    this.lastCycleTime = 0;
   }
 
   public void trace(long startTime, long endTime) {
@@ -45,17 +47,18 @@ class Metric {
   }
 
   public void tryPrint() {
-    if (invokeCount % 100 == 0) {
+    if (invokeCount % PRINT_RATE == 0) {
       logger.info(
           String.format(
-              "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, 
Last100AVG: %fms",
+              "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, 
Last%dAVG: %fms",
               Thread.currentThread().getId(),
               stepName,
               invokeCount,
               totalTime * 1.0 / 1000000,
               totalTime * 1.0 / 1000000 / invokeCount,
-              (totalTime * 1.0 - last100Time) / 1000000 / 100));
-      last100Time = totalTime;
+              PRINT_RATE,
+              (totalTime * 1.0 - lastCycleTime) / 1000000 / PRINT_RATE));
+      lastCycleTime = totalTime;
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 84db6df7f7..9f72dc4802 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -96,10 +96,21 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
     schemaFetchStatement.setSchemaPartition(schemaPartition);
 
     SchemaTree result = executeSchemaFetchQuery(schemaFetchStatement);
+    //    SchemaTree result = mockFetch(patternTree);
     result.setStorageGroups(storageGroups);
     return result;
   }
 
+  public SchemaTree mockFetch(PathPatternTree patternTree) {
+    SchemaTree tree = new SchemaTree();
+    List<PartialPath> paths = patternTree.getAllPathPatterns();
+    for (PartialPath path : paths) {
+      tree.appendSingleMeasurement(
+          path, new MeasurementSchema(path.getMeasurement(), 
TSDataType.FLOAT), null, false);
+    }
+    return tree;
+  }
+
   private SchemaTree executeSchemaFetchQuery(SchemaFetchStatement 
schemaFetchStatement) {
     long queryId = SessionManager.getInstance().requestQueryId(false);
     try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index a7a39564ef..d2d8b1c3dd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -349,9 +349,14 @@ public class QueryExecution implements IQueryExecution {
       // Although we monitor the state to transition to RUNNING, the future 
will return if any
       // Terminated state is triggered
       try {
+        if (stateMachine.getState() == QueryState.FINISHED) {
+          return getExecutionResult(QueryState.FINISHED);
+        }
         SettableFuture<QueryState> future = SettableFuture.create();
+        final long addStart = System.nanoTime();
         stateMachine.addStateChangeListener(
             state -> {
+              StepTracker.trace("stateQueue", addStart, System.nanoTime());
               if (state == QueryState.RUNNING || state.isDone()) {
                 future.set(state);
               }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 8390ac6fe5..989b025660 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -57,8 +57,8 @@ public class ClusterScheduler implements IScheduler {
   private final List<FragmentInstance> instances;
 
   private final IFragInstanceDispatcher dispatcher;
-  private final IFragInstanceStateTracker stateTracker;
-  private final IQueryTerminator queryTerminator;
+  private IFragInstanceStateTracker stateTracker;
+  private IQueryTerminator queryTerminator;
 
   public ClusterScheduler(
       MPPQueryContext queryContext,
@@ -75,12 +75,18 @@ public class ClusterScheduler implements IScheduler {
     this.dispatcher =
         new FragmentInstanceDispatcherImpl(
             queryType, executor, writeOperationExecutor, 
internalServiceClientManager);
-    this.stateTracker =
-        new FixedRateFragInsStateTracker(
-            stateMachine, executor, scheduledExecutor, instances, 
internalServiceClientManager);
-    this.queryTerminator =
-        new SimpleQueryTerminator(
-            executor, queryContext.getQueryId(), instances, 
internalServiceClientManager);
+    if (queryType == QueryType.READ) {
+      this.stateTracker =
+          new FixedRateFragInsStateTracker(
+              stateMachine, executor, scheduledExecutor, instances, 
internalServiceClientManager);
+      this.queryTerminator =
+          new SimpleQueryTerminator(
+              executor,
+              scheduledExecutor,
+              queryContext.getQueryId(),
+              instances,
+              internalServiceClientManager);
+    }
   }
 
   @Override
@@ -132,9 +138,13 @@ public class ClusterScheduler implements IScheduler {
     // TODO: It seems that it is unnecessary to check whether they are null or 
not. Is it a best
     // practice ?
     dispatcher.abort();
-    stateTracker.abort();
+    if (stateTracker != null) {
+      stateTracker.abort();
+    }
     // TODO: (xingtanzjr) handle the exception when the termination cannot 
succeed
-    queryTerminator.terminate();
+    if (queryTerminator != null) {
+      queryTerminator.terminate();
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 357c26fcf5..55e6047555 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -32,67 +32,80 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class SimpleQueryTerminator implements IQueryTerminator {
   private static final Logger logger = 
LoggerFactory.getLogger(SimpleQueryTerminator.class);
   private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L;
   private final ExecutorService executor;
+  protected ScheduledExecutorService scheduledExecutor;
   private final QueryId queryId;
-  private final List<FragmentInstance> fragmentInstances;
+  private List<TEndPoint> relatedHost;
+  private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
 
   public SimpleQueryTerminator(
       ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor,
       QueryId queryId,
       List<FragmentInstance> fragmentInstances,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager) {
     this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
     this.queryId = queryId;
-    this.fragmentInstances = fragmentInstances;
     this.internalServiceClientManager = internalServiceClientManager;
+    calculateParameter(fragmentInstances);
+  }
+
+  private void calculateParameter(List<FragmentInstance> instances) {
+    this.relatedHost = getRelatedHost(instances);
+    this.ownedFragmentInstance = new HashMap<>();
+    for (TEndPoint endPoint : relatedHost) {
+      ownedFragmentInstance.put(endPoint, 
getRelatedFragmentInstances(endPoint, instances));
+    }
   }
 
   @Override
   public Future<Boolean> terminate() {
-    List<TEndPoint> relatedHost = getRelatedHost();
-    try {
-      Thread.sleep(TERMINATION_GRACE_PERIOD_IN_MS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
+    return scheduledExecutor.schedule(
+        this::syncTerminate, TERMINATION_GRACE_PERIOD_IN_MS, 
TimeUnit.MILLISECONDS);
+  }
+
+  public Boolean syncTerminate() {
+    for (TEndPoint endPoint : relatedHost) {
+      // TODO (jackie tien) change the port
+      try (SyncDataNodeInternalServiceClient client =
+          internalServiceClientManager.borrowClient(endPoint)) {
+        client.cancelQuery(
+            new TCancelQueryReq(queryId.getId(), 
ownedFragmentInstance.get(endPoint)));
+      } catch (IOException e) {
+        logger.error("can't connect to node {}", endPoint, e);
+        return false;
+      } catch (TException e) {
+        return false;
+      }
     }
-    return executor.submit(
-        () -> {
-          for (TEndPoint endPoint : relatedHost) {
-            // TODO (jackie tien) change the port
-            try (SyncDataNodeInternalServiceClient client =
-                internalServiceClientManager.borrowClient(endPoint)) {
-              client.cancelQuery(
-                  new TCancelQueryReq(queryId.getId(), 
getRelatedFragmentInstances(endPoint)));
-            } catch (IOException e) {
-              logger.error("can't connect to node {}", endPoint, e);
-              return false;
-            } catch (TException e) {
-              return false;
-            }
-          }
-          return true;
-        });
+    return true;
   }
 
-  private List<TEndPoint> getRelatedHost() {
+  private List<TEndPoint> getRelatedHost(List<FragmentInstance> 
fragmentInstances) {
     return fragmentInstances.stream()
         .map(instance -> instance.getHostDataNode().internalEndPoint)
         .distinct()
         .collect(Collectors.toList());
   }
 
-  private List<TFragmentInstanceId> getRelatedFragmentInstances(TEndPoint 
endPoint) {
+  private List<TFragmentInstanceId> getRelatedFragmentInstances(
+      TEndPoint endPoint, List<FragmentInstance> fragmentInstances) {
     return fragmentInstances.stream()
         .filter(instance -> 
instance.getHostDataNode().internalEndPoint.equals(endPoint))
         .map(instance -> instance.getId().toThrift())
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index 5433b07c7c..f4b7664c57 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -91,7 +91,11 @@ public class StandaloneScheduler implements IScheduler {
             stateMachine, executor, scheduledExecutor, instances, 
internalServiceClientManager);
     this.queryTerminator =
         new SimpleQueryTerminator(
-            executor, queryContext.getQueryId(), instances, 
internalServiceClientManager);
+            executor,
+            scheduledExecutor,
+            queryContext.getQueryId(),
+            instances,
+            internalServiceClientManager);
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning

Reply via email to