This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dynamic_compaction by this
push:
new 335ed5c async hitter
335ed5c is described below
commit 335ed5c53fd4e208f85f0aa04cb95e048bf3b080
Author: EJTTianyu <[email protected]>
AuthorDate: Sat May 22 03:29:03 2021 +0800
async hitter
---
.../org/apache/iotdb/db/concurrent/ThreadName.java | 1 +
.../db/engine/heavyhitter/QueryHeavyHitters.java | 5 ++
.../db/engine/heavyhitter/QueryHitterManager.java | 87 +++++++++++++++++++++-
.../engine/heavyhitter/hitter/DefaultHitter.java | 15 ++++
.../engine/heavyhitter/hitter/HashMapHitter.java | 15 +++-
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 11 ++-
.../java/org/apache/iotdb/db/service/IoTDB.java | 7 ++
.../org/apache/iotdb/db/service/ServiceType.java | 1 +
8 files changed, 131 insertions(+), 11 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index a901ebb..dfdd16c 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -35,6 +35,7 @@ public enum ThreadName {
FLUSH_SERVICE("Flush-ServerServiceImpl"),
FLUSH_SUB_TASK_SERVICE("Flush-SubTask-ServerServiceImpl"),
COMPACTION_SERVICE("Compaction-ServerServiceImpl"),
+ HITTER_SERVICE("Hitter"),
WAL_DAEMON("IoTDB-MultiFileLogNodeManager-Sync-Thread"),
WAL_FORCE_DAEMON("IoTDB-MultiFileLogNodeManager-Force-Thread"),
INDEX_SERVICE("Index-ServerServiceImpl"),
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHeavyHitters.java
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHeavyHitters.java
index b9c0989..1b720d8 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHeavyHitters.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHeavyHitters.java
@@ -29,6 +29,11 @@ import org.apache.iotdb.db.metadata.PartialPath;
public interface QueryHeavyHitters {
/**
+ * accept time series path list to estimate query frequency
+ */
+ void acceptQuerySeriesList(List<PartialPath> queryPaths);
+
+ /**
* 用于接收查询的时间序列
*/
void acceptQuerySeries(PartialPath queryPath);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHitterManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHitterManager.java
index cff5504..fb36ee9 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHitterManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/QueryHitterManager.java
@@ -19,19 +19,37 @@
package org.apache.iotdb.db.engine.heavyhitter;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.heavyhitter.hitter.DefaultHitter;
import org.apache.iotdb.db.engine.heavyhitter.hitter.HashMapHitter;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class QueryHitterManager {
+public class QueryHitterManager implements IService {
- private static final QueryHeavyHitters INSTANCE = loadQueryHitters();
+ private static final Logger logger =
+ LoggerFactory.getLogger(QueryHitterManager.class);
+ private static final QueryHitterManager INSTANCE = new QueryHitterManager();
+ private ExecutorService pool;
+ private QueryHeavyHitters queryHeavyHitters;
- public static QueryHeavyHitters getQueryHitter() {
+ public static QueryHitterManager getInstance() {
return INSTANCE;
}
- private static QueryHeavyHitters loadQueryHitters() {
+ public QueryHeavyHitters getQueryHitter() {
+ return queryHeavyHitters;
+ }
+
+ private QueryHeavyHitters loadQueryHitters() {
switch
(IoTDBDescriptor.getInstance().getConfig().getQueryHitterStrategy()) {
case HASH_STRATEGY:
return new
HashMapHitter(IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum());
@@ -41,4 +59,65 @@ public class QueryHitterManager {
}
}
+ @Override
+ public void start() throws StartupException {
+ if (pool == null) {
+ pool = IoTDBThreadPoolFactory.newScheduledThreadPool(
+ 1, ThreadName.HITTER_SERVICE.getName());
+ }
+ if (queryHeavyHitters == null) {
+ queryHeavyHitters = loadQueryHitters();
+ }
+ logger.info("QueryHitterManager started.");
+ }
+
+ @Override
+ public void stop() {
+ if (pool != null) {
+ long startTime = System.currentTimeMillis();
+ while (!pool.isTerminated() || !pool.isTerminated()) {
+ int timeMillis = 0;
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.error(
+ "QueryHitterManager {} shutdown",
+ ThreadName.HITTER_SERVICE.getName(),
+ e);
+ Thread.currentThread().interrupt();
+ }
+ timeMillis += 200;
+ long time = System.currentTimeMillis() - startTime;
+ if (timeMillis % 60_000 == 0) {
+ logger.warn("QueryHitterManager has wait for {} seconds to stop",
time / 1000);
+ }
+ }
+ }
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.HITTER_SERVICE;
+ }
+
+ public void submitTask(HitterTask hitterTask) {
+ if (pool != null && !pool.isTerminated()) {
+ pool.submit(hitterTask);
+ }
+ }
+
+ public class HitterTask implements Runnable {
+
+ private List<PartialPath> queryPaths;
+
+ public HitterTask(List<PartialPath> queryPaths) {
+ this.queryPaths = queryPaths;
+ }
+
+ @Override
+ public void run() {
+ getQueryHitter().acceptQuerySeriesList(queryPaths);
+ }
+ }
+
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
index 0bf8b18..3d23f32 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.engine.heavyhitter.hitter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.heavyhitter.QueryHeavyHitters;
@@ -37,12 +39,25 @@ import org.slf4j.LoggerFactory;
public class DefaultHitter implements QueryHeavyHitters {
private static final Logger logger =
LoggerFactory.getLogger(DefaultHitter.class);
+ protected final ReadWriteLock hitterLock = new ReentrantReadWriteLock();
public DefaultHitter(int maxHitterNum) {
}
@Override
+ public void acceptQuerySeriesList(List<PartialPath> queryPaths) {
+ hitterLock.writeLock().lock();
+ try {
+ for (PartialPath path : queryPaths) {
+ acceptQuerySeries(path);
+ }
+ } finally {
+ hitterLock.writeLock().unlock();
+ }
+ }
+
+ @Override
public void acceptQuerySeries(PartialPath queryPath) {
// do nothing
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
index f9b735d..f097f5a 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/HashMapHitter.java
@@ -63,19 +63,26 @@ public class HashMapHitter implements QueryHeavyHitters {
}
@Override
- public void acceptQuerySeries(PartialPath queryPath) {
+ public void acceptQuerySeriesList(List<PartialPath> queryPaths) {
hitterLock.writeLock().lock();
try {
- if (queryPath == null) {
- return;
+ for (PartialPath path : queryPaths) {
+ acceptQuerySeries(path);
}
- counter.put(queryPath, counter.getOrDefault(queryPath, 0) + 1);
} finally {
hitterLock.writeLock().unlock();
}
}
@Override
+ public void acceptQuerySeries(PartialPath queryPath) {
+ if (queryPath == null) {
+ return;
+ }
+ counter.put(queryPath, counter.getOrDefault(queryPath, 0) + 1);
+ }
+
+ @Override
public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws
MetadataException {
hitterLock.writeLock().lock();
try {
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index f6f7157..e0a9698 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -593,6 +593,11 @@ public class PhysicalGenerator {
}
try {
deduplicate(queryPlan, fetchSize);
+ // estimate time series' query frequency
+ if (queryPlan instanceof RawDataQueryPlan) {
+
QueryHitterManager.getInstance().submitTask(QueryHitterManager.getInstance().new
HitterTask(
+ ((RawDataQueryPlan) queryPlan).getDeduplicatedPaths()));
+ }
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
@@ -681,9 +686,9 @@ public class PhysicalGenerator {
queryPlan.setDataTypes(dataTypes);
// add query to hitter
- for (PartialPath path: paths) {
- QueryHitterManager.getQueryHitter().acceptQuerySeries(path);
- }
+ // for (PartialPath path: paths) {
+ // QueryHitterManager.getQueryHitter().acceptQuerySeries(path);
+ // }
// deduplicate from here
if (queryPlan instanceof AlignByDevicePlan) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index a67b345..9401b43 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.heavyhitter.QueryHitterManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
@@ -112,6 +114,11 @@ public class IoTDB implements IoTDBMBean {
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(StorageEngine.getInstance());
+ if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
+ == CompactionStrategy.HITTER_LEVEL_COMPACTION) {
+ registerManager.register(QueryHitterManager.getInstance());
+ }
+
// When registering statMonitor, we should start recovering some statistics
// with latest values stored
// Warn: registMonitor() method should be called after systemDataRecovery()
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 0d50afb..edf3539 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -38,6 +38,7 @@ public enum ServiceType {
UPGRADE_SERVICE("UPGRADE DataService", ""),
MERGE_SERVICE("Merge Manager", "Merge Manager"),
COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
+ HITTER_SERVICE("Hitter Manager", "Hitter Manager"),
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE",
"PERFORMANCE_STATISTIC_SERVICE"),
TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
CACHE_HIT_RATIO_DISPLAY_SERVICE("CACHE_HIT_RATIO_DISPLAY_SERVICE",