This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch hitter_compaction_mto_master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/hitter_compaction_mto_master
by this push:
new bc1d9cc finish default heavy hitter strategy
bc1d9cc is described below
commit bc1d9cc0e8c19a7b5128534159a10e6447639783
Author: EJTTianyu <[email protected]>
AuthorDate: Tue Apr 27 21:52:48 2021 +0800
finish default heavy hitter strategy
---
.../resources/conf/iotdb-engine.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 35 +++++++++++++
.../db/engine/compaction/CompactionStrategy.java | 4 ++
.../HitterLevelCompactionTsFileManagement.java} | 17 ++-----
.../QueryHeavyHitters.java} | 29 +++++------
.../QueryHitterManager.java} | 27 +++++-----
.../QueryHitterStrategy.java} | 20 ++------
.../heavyhitter/hitters/DefaultHitter.java | 59 ++++++++++++++++++++++
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 28 ++++++++++
9 files changed, 164 insertions(+), 57 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties
b/server/src/assembly/resources/conf/iotdb-engine.properties
index 5299d78..cf4da38 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -364,7 +364,7 @@ timestamp_precision=ms
####################
### Merge Configurations
####################
-# LEVEL_COMPACTION, NO_COMPACTION
+# LEVEL_COMPACTION, NO_COMPACTION, HITTER_LEVEL_COMPACTION
# Datatype: CompactionStrategy
# compaction_strategy=LEVEL_COMPACTION
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index ee83180..bd3ea59 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.conf;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.compaction.heavyhitter.QueryHitterStrategy;
import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.exception.LoadConfigurationException;
@@ -315,6 +316,15 @@ public class IoTDBConfig {
/** LEVEL_COMPACTION, NO_COMPACTION */
private CompactionStrategy compactionStrategy =
CompactionStrategy.LEVEL_COMPACTION;
+ /** Query hitter strategy */
+ private QueryHitterStrategy queryHitterStrategy =
QueryHitterStrategy.DEFAULT_STRATEGY;
+
+ /** max query path hitter contains */
+ private int maxHitterNum = 5000;
+
+ /** size ratio of the hitter level merge */
+ private int sizeRatio = 2;
+
/**
* Works when the compaction_strategy is LEVEL_COMPACTION. Whether to merge
unseq files into seq
* files or not.
@@ -1466,6 +1476,31 @@ public class IoTDBConfig {
this.compactionStrategy = compactionStrategy;
}
+ public QueryHitterStrategy getQueryHitterStrategy() {
+ return queryHitterStrategy;
+ }
+
+ public void setQueryHitterStrategy(
+ QueryHitterStrategy queryHitterStrategy) {
+ this.queryHitterStrategy = queryHitterStrategy;
+ }
+
+ public int getMaxHitterNum() {
+ return maxHitterNum;
+ }
+
+ public void setMaxHitterNum(int maxHitterNum) {
+ this.maxHitterNum = maxHitterNum;
+ }
+
+ public int getSizeRatio() {
+ return sizeRatio;
+ }
+
+ public void setSizeRatio(int sizeRatio) {
+ this.sizeRatio = sizeRatio;
+ }
+
public boolean isEnableUnseqCompaction() {
return enableUnseqCompaction;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
index 96ec9f9..ba98370 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
@@ -19,17 +19,21 @@
package org.apache.iotdb.db.engine.compaction;
+import
org.apache.iotdb.db.engine.compaction.heavyhitter.HitterLevelCompactionTsFileManagement;
import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement;
public enum CompactionStrategy {
LEVEL_COMPACTION,
+ HITTER_LEVEL_COMPACTION,
NO_COMPACTION;
public TsFileManagement getTsFileManagement(String storageGroupName, String
storageGroupDir) {
switch (this) {
case LEVEL_COMPACTION:
return new LevelCompactionTsFileManagement(storageGroupName,
storageGroupDir);
+ case HITTER_LEVEL_COMPACTION:
+ return new HitterLevelCompactionTsFileManagement(storageGroupName,
storageGroupDir);
case NO_COMPACTION:
default:
return new NoCompactionTsFileManagement(storageGroupName,
storageGroupDir);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/HitterLevelCompactionTsFileManagement.java
similarity index 61%
copy from
server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
copy to
server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/HitterLevelCompactionTsFileManagement.java
index 96ec9f9..c9b60a9 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/HitterLevelCompactionTsFileManagement.java
@@ -17,22 +17,13 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.compaction;
+package org.apache.iotdb.db.engine.compaction.heavyhitter;
import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
-import org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement;
-public enum CompactionStrategy {
- LEVEL_COMPACTION,
- NO_COMPACTION;
+public class HitterLevelCompactionTsFileManagement extends
LevelCompactionTsFileManagement {
- public TsFileManagement getTsFileManagement(String storageGroupName, String
storageGroupDir) {
- switch (this) {
- case LEVEL_COMPACTION:
- return new LevelCompactionTsFileManagement(storageGroupName,
storageGroupDir);
- case NO_COMPACTION:
- default:
- return new NoCompactionTsFileManagement(storageGroupName,
storageGroupDir);
- }
+ public HitterLevelCompactionTsFileManagement(String storageGroupName, String
storageGroupDir) {
+ super(storageGroupName, storageGroupDir);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHeavyHitters.java
similarity index 55%
copy from
server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
copy to
server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHeavyHitters.java
index 96ec9f9..9b4c590 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHeavyHitters.java
@@ -17,22 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.compaction;
+package org.apache.iotdb.db.engine.compaction.heavyhitter;
-import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
-import org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement;
+import java.util.List;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
-public enum CompactionStrategy {
- LEVEL_COMPACTION,
- NO_COMPACTION;
+public interface QueryHeavyHitters {
- public TsFileManagement getTsFileManagement(String storageGroupName, String
storageGroupDir) {
- switch (this) {
- case LEVEL_COMPACTION:
- return new LevelCompactionTsFileManagement(storageGroupName,
storageGroupDir);
- case NO_COMPACTION:
- default:
- return new NoCompactionTsFileManagement(storageGroupName,
storageGroupDir);
- }
- }
+ /**
+ * accept time series path to estimate query frequency
+ */
+ void acceptQuerySeries(PartialPath queryPath);
+
+ /**
+ * obtain the time series with the highest query revenue
+ */
+ List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws
MetadataException;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHitterManager.java
similarity index 56%
copy from
server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
copy to
server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHitterManager.java
index 96ec9f9..a525f5e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHitterManager.java
@@ -17,22 +17,25 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.compaction;
+package org.apache.iotdb.db.engine.compaction.heavyhitter;
-import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
-import org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.heavyhitter.hitters.DefaultHitter;
-public enum CompactionStrategy {
- LEVEL_COMPACTION,
- NO_COMPACTION;
+public class QueryHitterManager {
- public TsFileManagement getTsFileManagement(String storageGroupName, String
storageGroupDir) {
- switch (this) {
- case LEVEL_COMPACTION:
- return new LevelCompactionTsFileManagement(storageGroupName,
storageGroupDir);
- case NO_COMPACTION:
+ private static final QueryHeavyHitters INSTANCE = loadQueryHitters();
+
+ public static QueryHeavyHitters getQueryHitter() {
+ return INSTANCE;
+ }
+
+ private static QueryHeavyHitters loadQueryHitters() {
+ switch
(IoTDBDescriptor.getInstance().getConfig().getQueryHitterStrategy()) {
+ case DEFAULT_STRATEGY:
default:
- return new NoCompactionTsFileManagement(storageGroupName,
storageGroupDir);
+ return new
DefaultHitter(IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum());
}
}
+
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHitterStrategy.java
similarity index 55%
copy from
server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
copy to
server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHitterStrategy.java
index 96ec9f9..16dabbc 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/QueryHitterStrategy.java
@@ -17,22 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.compaction;
+package org.apache.iotdb.db.engine.compaction.heavyhitter;
-import
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
-import org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement;
+public enum QueryHitterStrategy {
+ // test query hitter
+ DEFAULT_STRATEGY
-public enum CompactionStrategy {
- LEVEL_COMPACTION,
- NO_COMPACTION;
-
- public TsFileManagement getTsFileManagement(String storageGroupName, String
storageGroupDir) {
- switch (this) {
- case LEVEL_COMPACTION:
- return new LevelCompactionTsFileManagement(storageGroupName,
storageGroupDir);
- case NO_COMPACTION:
- default:
- return new NoCompactionTsFileManagement(storageGroupName,
storageGroupDir);
- }
- }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/hitters/DefaultHitter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/hitters/DefaultHitter.java
new file mode 100644
index 0000000..29fbccf
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/heavyhitter/hitters/DefaultHitter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.heavyhitter.hitters;
+
+import java.util.List;
+import org.apache.iotdb.db.engine.compaction.heavyhitter.QueryHeavyHitters;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.utils.MergeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * default query hitter, return first device's time series for hitter
compaction
+ */
+public class DefaultHitter implements QueryHeavyHitters {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DefaultHitter.class);
+
+ public DefaultHitter(int maxHitterNum) {
+
+ }
+
+ @Override
+ public void acceptQuerySeries(PartialPath queryPath) {
+ // do nothing
+ }
+
+ @Override
+ public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws
MetadataException {
+ List<PartialPath> unmergedSeries =
+ MManager.getInstance().getAllTimeseriesPath(sgName);
+ List<List<PartialPath>> devicePaths =
MergeUtils.splitPathsByDevice(unmergedSeries);
+ if (devicePaths.size() > 0) {
+ String deviceName = devicePaths.get(0).get(0).getDevice();
+ logger.info("default hitter, top compaction device:{}", deviceName);
+ return devicePaths.get(0);
+ }
+ return null;
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 41ed225..c1067bd 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.tsfile.write.writer;
+import java.util.Set;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
@@ -426,6 +427,33 @@ public class TsFileIOWriter {
}
}
+ /**
+ * Remove such ChunkMetadata if heavy hitters has merged
+ */
+ public void filterChunksHitter(Set<Path> mergedPaths) {
+ Iterator<ChunkGroupMetadata> chunkGroupMetaDataIterator =
chunkGroupMetadataList.iterator();
+ while (chunkGroupMetaDataIterator.hasNext()) {
+ ChunkGroupMetadata chunkGroupMetaData =
chunkGroupMetaDataIterator.next();
+ String deviceId = chunkGroupMetaData.getDevice();
+ int chunkNum = chunkGroupMetaData.getChunkMetadataList().size();
+ Iterator<ChunkMetadata> chunkMetaDataIterator =
chunkGroupMetaData.getChunkMetadataList()
+ .iterator();
+ while (chunkMetaDataIterator.hasNext()) {
+ ChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
+ Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
+
+ boolean chunkInValid = mergedPaths.contains(path);
+ if (chunkInValid) {
+ chunkMetaDataIterator.remove();
+ chunkNum--;
+ }
+ }
+ if (chunkNum == 0) {
+ chunkGroupMetaDataIterator.remove();
+ }
+ }
+ }
+
public void writePlanIndices() throws IOException {
ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE,
out.wrapAsStream());
ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream());