This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dynamic_compaction by this 
push:
     new 335ed5c  async hitter
335ed5c is described below

commit 335ed5c53fd4e208f85f0aa04cb95e048bf3b080
Author: EJTTianyu <[email protected]>
AuthorDate: Sat May 22 03:29:03 2021 +0800

    async hitter
---
 .../org/apache/iotdb/db/concurrent/ThreadName.java |  1 +
 .../db/engine/heavyhitter/QueryHeavyHitters.java   |  5 ++
 .../db/engine/heavyhitter/QueryHitterManager.java  | 87 +++++++++++++++++++++-
 .../engine/heavyhitter/hitter/DefaultHitter.java   | 15 ++++
 .../engine/heavyhitter/hitter/HashMapHitter.java   | 15 +++-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    | 11 ++-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  7 ++
 .../org/apache/iotdb/db/service/ServiceType.java   |  1 +
 8 files changed, 131 insertions(+), 11 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java 
b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index a901ebb..dfdd16c 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -35,6 +35,7 @@ public enum ThreadName {
   FLUSH_SERVICE("Flush-ServerServiceImpl"),
   FLUSH_SUB_TASK_SERVICE("Flush-SubTask-ServerServiceImpl"),
   COMPACTION_SERVICE("Compaction-ServerServiceImpl"),
+  HITTER_SERVICE("Hitter"),
   WAL_DAEMON("IoTDB-MultiFileLogNodeManager-Sync-Thread"),
   WAL_FORCE_DAEMON("IoTDB-MultiFileLogNodeManager-Force-Thread"),
   INDEX_SERVICE("Index-ServerServiceImpl"),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHeavyHitters.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHeavyHitters.java
index b9c0989..1b720d8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHeavyHitters.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHeavyHitters.java
@@ -29,6 +29,11 @@ import org.apache.iotdb.db.metadata.PartialPath;
 public interface QueryHeavyHitters {
 
   /**
+   * accept time series path list to estimate query frequency
+   */
+  void acceptQuerySeriesList(List<PartialPath> queryPaths);
+
+  /**
    * 用于接收查询的时间序列
    */
   void acceptQuerySeries(PartialPath queryPath);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHitterManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHitterManager.java
index cff5504..fb36ee9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHitterManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHitterManager.java
@@ -19,19 +19,37 @@
 
 package org.apache.iotdb.db.engine.heavyhitter;
 
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.heavyhitter.hitter.DefaultHitter;
 import org.apache.iotdb.db.engine.heavyhitter.hitter.HashMapHitter;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class QueryHitterManager {
+public class QueryHitterManager implements IService {
 
-  private static final QueryHeavyHitters INSTANCE = loadQueryHitters();
+  private static final Logger logger =
+      LoggerFactory.getLogger(QueryHitterManager.class);
+  private static final QueryHitterManager INSTANCE = new QueryHitterManager();
+  private ExecutorService pool;
+  private QueryHeavyHitters queryHeavyHitters;
 
-  public static QueryHeavyHitters getQueryHitter() {
+  public static QueryHitterManager getInstance() {
     return INSTANCE;
   }
 
-  private static QueryHeavyHitters loadQueryHitters() {
+  public QueryHeavyHitters getQueryHitter() {
+    return queryHeavyHitters;
+  }
+
+  private QueryHeavyHitters loadQueryHitters() {
     switch 
(IoTDBDescriptor.getInstance().getConfig().getQueryHitterStrategy()) {
       case HASH_STRATEGY:
         return new 
HashMapHitter(IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum());
@@ -41,4 +59,65 @@ public class QueryHitterManager {
     }
   }
 
+  @Override
+  public void start() throws StartupException {
+    if (pool == null) {
+      pool = IoTDBThreadPoolFactory.newScheduledThreadPool(
+          1, ThreadName.HITTER_SERVICE.getName());
+    }
+    if (queryHeavyHitters == null) {
+      queryHeavyHitters = loadQueryHitters();
+    }
+    logger.info("QueryHitterManager started.");
+  }
+
+  @Override
+  public void stop() {
+    if (pool != null) {
+      long startTime = System.currentTimeMillis();
+      while (!pool.isTerminated() || !pool.isTerminated()) {
+        int timeMillis = 0;
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          logger.error(
+              "QueryHitterManager {} shutdown",
+              ThreadName.HITTER_SERVICE.getName(),
+              e);
+          Thread.currentThread().interrupt();
+        }
+        timeMillis += 200;
+        long time = System.currentTimeMillis() - startTime;
+        if (timeMillis % 60_000 == 0) {
+          logger.warn("QueryHitterManager has wait for {} seconds to stop", 
time / 1000);
+        }
+      }
+    }
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.HITTER_SERVICE;
+  }
+
+  public void submitTask(HitterTask hitterTask) {
+    if (pool != null && !pool.isTerminated()) {
+      pool.submit(hitterTask);
+    }
+  }
+
+  public class HitterTask implements Runnable {
+
+    private List<PartialPath> queryPaths;
+
+    public HitterTask(List<PartialPath> queryPaths) {
+      this.queryPaths = queryPaths;
+    }
+
+    @Override
+    public void run() {
+      getQueryHitter().acceptQuerySeriesList(queryPaths);
+    }
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
index 0bf8b18..3d23f32 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.engine.heavyhitter.hitter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.heavyhitter.QueryHeavyHitters;
@@ -37,12 +39,25 @@ import org.slf4j.LoggerFactory;
 public class DefaultHitter implements QueryHeavyHitters {
 
   private static final Logger logger = 
LoggerFactory.getLogger(DefaultHitter.class);
+  protected final ReadWriteLock hitterLock = new ReentrantReadWriteLock();
 
   public DefaultHitter(int maxHitterNum) {
 
   }
 
   @Override
+  public void acceptQuerySeriesList(List<PartialPath> queryPaths) {
+    hitterLock.writeLock().lock();
+    try {
+      for (PartialPath path : queryPaths) {
+        acceptQuerySeries(path);
+      }
+    } finally {
+      hitterLock.writeLock().unlock();
+    }
+  }
+
+  @Override
   public void acceptQuerySeries(PartialPath queryPath) {
     // do nothing
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
index f9b735d..f097f5a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
@@ -63,19 +63,26 @@ public class HashMapHitter implements QueryHeavyHitters {
   }
 
   @Override
-  public void acceptQuerySeries(PartialPath queryPath) {
+  public void acceptQuerySeriesList(List<PartialPath> queryPaths) {
     hitterLock.writeLock().lock();
     try {
-      if (queryPath == null) {
-        return;
+      for (PartialPath path : queryPaths) {
+        acceptQuerySeries(path);
       }
-      counter.put(queryPath, counter.getOrDefault(queryPath, 0) + 1);
     } finally {
       hitterLock.writeLock().unlock();
     }
   }
 
   @Override
+  public void acceptQuerySeries(PartialPath queryPath) {
+    if (queryPath == null) {
+      return;
+    }
+    counter.put(queryPath, counter.getOrDefault(queryPath, 0) + 1);
+  }
+
+  @Override
   public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws 
MetadataException {
     hitterLock.writeLock().lock();
     try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index f6f7157..e0a9698 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -593,6 +593,11 @@ public class PhysicalGenerator {
     }
     try {
       deduplicate(queryPlan, fetchSize);
+      // estimate time series' query frequency
+      if (queryPlan instanceof RawDataQueryPlan) {
+        
QueryHitterManager.getInstance().submitTask(QueryHitterManager.getInstance().new
 HitterTask(
+            ((RawDataQueryPlan) queryPlan).getDeduplicatedPaths()));
+      }
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     }
@@ -681,9 +686,9 @@ public class PhysicalGenerator {
     queryPlan.setDataTypes(dataTypes);
 
     // add query to hitter
-    for (PartialPath path: paths) {
-      QueryHitterManager.getQueryHitter().acceptQuerySeries(path);
-    }
+    // for (PartialPath path: paths) {
+      // QueryHitterManager.getQueryHitter().acceptQuerySeries(path);
+    // }
 
     // deduplicate from here
     if (queryPlan instanceof AlignByDevicePlan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java 
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index a67b345..9401b43 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.heavyhitter.QueryHitterManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -112,6 +114,11 @@ public class IoTDB implements IoTDBMBean {
     JMXService.registerMBean(getInstance(), mbeanName);
     registerManager.register(StorageEngine.getInstance());
 
+    if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
+        == CompactionStrategy.HITTER_LEVEL_COMPACTION) {
+      registerManager.register(QueryHitterManager.getInstance());
+    }
+
     // When registering statMonitor, we should start recovering some statistics
     // with latest values stored
     // Warn: registMonitor() method should be called after systemDataRecovery()
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java 
b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 0d50afb..edf3539 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -38,6 +38,7 @@ public enum ServiceType {
   UPGRADE_SERVICE("UPGRADE DataService", ""),
   MERGE_SERVICE("Merge Manager", "Merge Manager"),
   COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
+  HITTER_SERVICE("Hitter Manager", "Hitter Manager"),
   PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", 
"PERFORMANCE_STATISTIC_SERVICE"),
   TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
   CACHE_HIT_RATIO_DISPLAY_SERVICE("CACHE_HIT_RATIO_DISPLAY_SERVICE",

Reply via email to