This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch query_module
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/query_module by this push:
new 2675ebf Add workload manager (#2520)
2675ebf is described below
commit 2675ebf206cf26eb4340f1a58d0f840ab87736e2
Author: liuxuxin <[email protected]>
AuthorDate: Thu Jan 21 20:29:25 2021 +0800
Add workload manager (#2520)
Add workload manager
---
grafana/pom.xml | 2 +-
jdbc/pom.xml | 2 +-
pom.xml | 6 +-
server/src/assembly/resources/conf/logback.xml | 22 ++++++
.../groupby/GroupByWithValueFilterDataSet.java | 31 +++++++++
.../groupby/GroupByWithoutValueFilterDataSet.java | 27 ++++++++
.../db/query/executor/AggregationExecutor.java | 68 +++++++++++++++++++
.../db/query/workloadmanager/WorkloadManager.java | 64 ++++++++++++++++++
.../queryrecord/AggregationQueryRecord.java | 49 ++++++++++++++
.../queryrecord/GroupByQueryRecord.java | 79 ++++++++++++++++++++++
.../workloadmanager/queryrecord/QueryRecord.java | 31 +++++++++
.../queryrecord/QueryRecordType.java | 5 ++
12 files changed, 381 insertions(+), 5 deletions(-)
diff --git a/grafana/pom.xml b/grafana/pom.xml
index 357350a..b605e65 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -165,7 +165,7 @@
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
- <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index b44bb46..090fdb0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -223,7 +223,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>
diff --git a/pom.xml b/pom.xml
index c0a1dc4..96752d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,7 +139,7 @@
<sonar.exclusions>**/generated-sources</sonar.exclusions>
<!-- By default, the argLine is empty-->
<gson.version>2.8.6</gson.version>
- <argLine />
+ <argLine/>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
@@ -599,7 +599,7 @@
<id>enforce-version-convergence</id>
<configuration>
<rules>
- <dependencyConvergence />
+ <dependencyConvergence/>
</rules>
</configuration>
<goals>
@@ -645,7 +645,7 @@
</requireJavaVersion>
<!-- Disabled for now as it breaks the ability
to build single modules -->
<!--reactorModuleConvergence/-->
- <banVulnerable
implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"
/>
+ <banVulnerable
implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
</rules>
</configuration>
</execution>
diff --git a/server/src/assembly/resources/conf/logback.xml
b/server/src/assembly/resources/conf/logback.xml
index 22a80d0..05bea04 100644
--- a/server/src/assembly/resources/conf/logback.xml
+++ b/server/src/assembly/resources/conf/logback.xml
@@ -251,6 +251,25 @@
<level>INFO</level>
</filter>
</appender>
+ <appender class="ch.qos.logback.core.rolling.RollingFileAppender"
name="QUERY_RECORD">
+ <file>${IOTDB_HOME}/logs/log_query_record.log</file>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+
<fileNamePattern>${IOTDB_HOME}/logs/log-query-record-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>10</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>300MB</maxFileSize>
+ </triggeringPolicy>
+ <append>true</append>
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <pattern>%d [%t] %-5p %C:%L - %m %n</pattern>
+ <charset>utf-8</charset>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
<root level="info">
<appender-ref ref="FILEDEBUG"/>
<appender-ref ref="FILEWARN"/>
@@ -280,4 +299,7 @@
<logger level="info" name="QUERY_FREQUENCY">
<appender-ref ref="QUERY_FREQUENCY"/>
</logger>
+ <logger level="info" name="QUERY_RECORD">
+ <appender-ref ref="QUERY_RECORD"/>
+ </logger>
</configuration>
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index bd37495..8ce0202 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.stream.Collectors;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -40,6 +42,9 @@ import org.apache.iotdb.db.query.filter.TsFileFilter;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
+import
org.apache.iotdb.db.query.workloadmanager.queryrecord.GroupByQueryRecord;
+import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -91,16 +96,42 @@ public class GroupByWithValueFilterDataSet extends
GroupByEngineDataSet {
List<StorageGroupProcessor> list = StorageEngine.getInstance()
.mergeLock(paths.stream().map(p -> (PartialPath)
p).collect(Collectors.toList()));
+ WorkloadManager manager = WorkloadManager.getInstance();
+ Map<String, List<Integer>> deviceQueryIdxMap = new HashMap<>();
try {
for (int i = 0; i < paths.size(); i++) {
PartialPath path = (PartialPath) paths.get(i);
allDataReaderList
.add(getReaderByTime(path, groupByTimePlan, dataTypes.get(i),
context, null));
+ // Map the device id to the corresponding path indexes
+ if (deviceQueryIdxMap.containsKey(path.getDevice())) {
+ deviceQueryIdxMap.get(path.getDevice()).add(i);
+ } else {
+ deviceQueryIdxMap.put(path.getDevice(), new ArrayList<>());
+ deviceQueryIdxMap.get(path.getDevice()).add(i);
+ }
+ }
+
+ // Add the query records to the workload manager
+ for(String device: deviceQueryIdxMap.keySet()) {
+ List<Integer> pathIndexes = deviceQueryIdxMap.get(device);
+ List<String> sensors = new ArrayList<>();
+ List<String> ops = new ArrayList<>();
+ for(int idx: pathIndexes) {
+ PartialPath path = (PartialPath) paths.get(idx);
+ sensors.add(path.getMeasurement());
+ ops.add(groupByTimePlan.getDeduplicatedAggregations().get(idx));
+ }
+ QueryRecord record = new GroupByQueryRecord(device, sensors, ops,
groupByTimePlan.getStartTime(),
+ groupByTimePlan.getEndTime(), groupByTimePlan.getInterval(),
groupByTimePlan.getSlidingStep());
+ manager.addRecord(record);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
}
+
+
}
protected TimeGenerator getTimeGenerator(IExpression expression,
QueryContext context,
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index a5f42a7..ff3226a 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -37,6 +37,9 @@ import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.AggregateResultFactory;
import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
+import
org.apache.iotdb.db.query.workloadmanager.queryrecord.GroupByQueryRecord;
+import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -91,6 +94,8 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
.mergeLock(paths.stream().map(p -> (PartialPath)
p).collect(Collectors.toList()));
try {
// init resultIndexes, group result indexes by path
+ WorkloadManager manager = WorkloadManager.getInstance();
+ Map<String, List<Integer>> deviceQueryIdxMap = new HashMap<>();
for (int i = 0; i < paths.size(); i++) {
PartialPath path = (PartialPath) paths.get(i);
if (!pathExecutors.containsKey(path)) {
@@ -105,6 +110,28 @@ public class GroupByWithoutValueFilterDataSet extends
GroupByEngineDataSet {
.getAggrResultByName(groupByTimePlan.getDeduplicatedAggregations().get(i),
dataTypes.get(i), ascending);
pathExecutors.get(path).addAggregateResult(aggrResult);
+ // Map the device id to the corresponding query indexes
+ if (deviceQueryIdxMap.containsKey(path.getDevice())) {
+ deviceQueryIdxMap.get(path.getDevice()).add(i);
+ } else {
+ deviceQueryIdxMap.put(path.getDevice(), new ArrayList<>());
+ deviceQueryIdxMap.get(path.getDevice()).add(i);
+ }
+ }
+
+ // Add the query records to the workload manager
+ for(String device: deviceQueryIdxMap.keySet()) {
+ List<Integer> pathIndexes = deviceQueryIdxMap.get(device);
+ List<String> sensors = new ArrayList<>();
+ List<String> ops = new ArrayList<>();
+ for(int idx: pathIndexes) {
+ PartialPath path = (PartialPath) paths.get(idx);
+ sensors.add(path.getMeasurement());
+ ops.add(groupByTimePlan.getDeduplicatedAggregations().get(idx));
+ }
+ QueryRecord record = new GroupByQueryRecord(device, sensors, ops,
groupByTimePlan.getStartTime(),
+ groupByTimePlan.getEndTime(), groupByTimePlan.getInterval(),
groupByTimePlan.getSlidingStep());
+ manager.addRecord(record);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 81dc0c9..b33fa19 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -47,6 +47,9 @@ import
org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
+import
org.apache.iotdb.db.query.workloadmanager.queryrecord.AggregationQueryRecord;
+import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -102,11 +105,44 @@ public class AggregationExecutor {
// TODO-Cluster: group the paths by storage group to reduce communications
List<StorageGroupProcessor> list = StorageEngine.getInstance()
.mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+ WorkloadManager manager = WorkloadManager.getInstance();
+ // DeviceID -> MeasurementID -> List<Aggregation Operation>
+ Map<String, Map<String, List<String>>> deviceSensorMap = new HashMap<>();
try {
for (Map.Entry<PartialPath, List<Integer>> entry :
pathToAggrIndexesMap.entrySet()) {
aggregateOneSeries(entry, aggregateResultList,
aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()),
timeFilter,
context);
+
+ // Record the query in the map
+ if (!deviceSensorMap.containsKey(entry.getKey().getDevice())) {
+ deviceSensorMap.put(entry.getKey().getDevice(), new HashMap<>());
+ }
+ Map<String, List<String>> measurementOpMap =
deviceSensorMap.get(entry.getKey().getDevice());
+ if (!measurementOpMap.containsKey(entry.getKey().getMeasurement())) {
+ measurementOpMap.put(entry.getKey().getMeasurement(), new
ArrayList<>());
+ }
+ List<Integer> aggrIndexes = entry.getValue();
+ List<String> ops =
measurementOpMap.get(entry.getKey().getMeasurement());
+ for(int idx = 0; idx < aggrIndexes.size(); ++idx) {
+
ops.add(aggregationPlan.getDeduplicatedAggregations().get(aggrIndexes.get(idx)));
+ }
+ }
+
+ // Put the query records into the manager
+ for (String device: deviceSensorMap.keySet()) {
+ Map<String, List<String>> measurementOpMap =
deviceSensorMap.get(device);
+ List<String> curDeviceMeasurements = new ArrayList<>();
+ List<String> curDeviceOps = new ArrayList<>();
+ for (String measurement: measurementOpMap.keySet()) {
+ List<String> ops = measurementOpMap.get(measurement);
+ for(int i = 0; i < ops.size(); ++i) {
+ curDeviceMeasurements.add(measurement);
+ curDeviceOps.add(ops.get(i));
+ }
+ }
+ QueryRecord record = new AggregationQueryRecord(device,
curDeviceMeasurements, curDeviceOps);
+ manager.addRecord(record);
}
} finally {
StorageEngine.getInstance().mergeUnLock(list);
@@ -325,6 +361,38 @@ public class AggregationExecutor {
.getAggrResultByName(aggregations.get(i), type, ascending);
aggregateResults.add(result);
}
+ // Workload collection
+ WorkloadManager manager = WorkloadManager.getInstance();
+ // DeviceID -> MeasurementID -> List<Aggregation Operation>
+ Map<String, Map<String, List<String>>> deviceSensorMap = new HashMap<>();
+ // Record the query in the map
+ for(int i = 0; i < selectedSeries.size(); ++i) {
+ if (!deviceSensorMap.containsKey(selectedSeries.get(i).getDevice())) {
+ deviceSensorMap.put(selectedSeries.get(i).getDevice(), new
HashMap<>());
+ }
+ Map<String, List<String>> measurementOpMap =
deviceSensorMap.get(selectedSeries.get(i).getDevice());
+ if
(!measurementOpMap.containsKey(selectedSeries.get(i).getMeasurement())) {
+ measurementOpMap.put(selectedSeries.get(i).getMeasurement(), new
ArrayList<>());
+ }
+ List<String> opList =
measurementOpMap.get(selectedSeries.get(i).getMeasurement());
+ opList.add(aggregations.get(i));
+ }
+
+ // Put the query records to the manager
+ for (String device: deviceSensorMap.keySet()) {
+ Map<String, List<String>> measurementOpMap = deviceSensorMap.get(device);
+ List<String> curDeviceMeasurements = new ArrayList<>();
+ List<String> curDeviceOps = new ArrayList<>();
+ for (String measurement: measurementOpMap.keySet()) {
+ List<String> ops = measurementOpMap.get(measurement);
+ for(int i = 0; i < ops.size(); ++i) {
+ curDeviceMeasurements.add(measurement);
+ curDeviceOps.add(ops.get(i));
+ }
+ }
+ QueryRecord record = new AggregationQueryRecord(device,
curDeviceMeasurements, curDeviceOps);
+ manager.addRecord(record);
+ }
aggregateWithValueFilter(aggregateResults, timestampGenerator,
readersOfSelectedSeries);
return constructDataSet(aggregateResults, queryPlan);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/WorkloadManager.java
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/WorkloadManager.java
new file mode 100644
index 0000000..f325097
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/WorkloadManager.java
@@ -0,0 +1,64 @@
+package org.apache.iotdb.db.query.workloadmanager;
+
+import org.apache.iotdb.db.query.workloadmanager.queryrecord.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkloadManager {
+ List<QueryRecord> records = new ArrayList<>();
+ private final Logger QUERY_RECORD_LOGGER =
LoggerFactory.getLogger("QUERY_RECORD");
+ private final int RECORDS_NUM_THRESHOLD = 300;
+ private final ExecutorService flushExecutor =
Executors.newFixedThreadPool(1);
+
+ private WorkloadManager() {
+ }
+
+ private static class WorkloadManagerHolder {
+ private static final WorkloadManager INSTANCE = new WorkloadManager();
+ }
+
+ private class QueryRecordFlushTask implements Runnable{
+ List<QueryRecord> records;
+ Logger QUERY_RECORD_LOGGER;
+
+ private QueryRecordFlushTask(List<QueryRecord> r, Logger l) {
+ records = r;
+ QUERY_RECORD_LOGGER = l;
+ }
+
+ @Override
+ public void run() {
+ for (QueryRecord record : records) {
+ QUERY_RECORD_LOGGER.info(record.getSql());
+ }
+ }
+ }
+
+ public static WorkloadManager getInstance() {
+ return WorkloadManagerHolder.INSTANCE;
+ }
+
+ public synchronized void addAggregationRecord(String device, List<String>
sensors, List<String> ops) {
+ // add aggregation record
+ QueryRecord record = new AggregationQueryRecord(device, sensors, ops);
+ this.addRecord(record);
+ }
+
+ public synchronized void addGroupByQueryRecord(String device, List<String>
sensors, List<String> ops,
+ long startTime, long endTime,
long interval, long slidingStep) {
+ QueryRecord record = new GroupByQueryRecord(device, sensors, ops,
startTime, endTime, interval, slidingStep);
+ this.addRecord(record);
+ }
+
+ public synchronized void addRecord(QueryRecord record) {
+ records.add(record);
+ if (records.size() > RECORDS_NUM_THRESHOLD) {
+ flushExecutor.execute(new QueryRecordFlushTask(records,
QUERY_RECORD_LOGGER));
+ this.records = new ArrayList<>();
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/AggregationQueryRecord.java
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/AggregationQueryRecord.java
new file mode 100644
index 0000000..3772680
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/AggregationQueryRecord.java
@@ -0,0 +1,49 @@
+package org.apache.iotdb.db.query.workloadmanager.queryrecord;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class AggregationQueryRecord extends QueryRecord {
+
+ public AggregationQueryRecord(String device, List<String> sensorList,
List<String> opList) {
+ sensors = new ArrayList<>(sensorList);
+ ops = new ArrayList<>(opList);
+ this.timestamp = new Date().getTime();
+ this.device = device;
+ this.recordType = QueryRecordType.AGGREGATION;
+ recoverSql();
+ recoverSqlWithTimestamp();
+ }
+
+ private void recoverSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ for (int i = 0; i < sensors.size(); ++i) {
+ sb.append(ops.get(i) + "(" + sensors.get(i) + ")");
+ if (i != sensors.size() - 1)
+ sb.append(", ");
+ }
+ sb.append(" FROM ");
+ sb.append(device);
+ sql = sb.toString();
+ }
+
+ private void recoverSqlWithTimestamp() {
+ Date d = new Date();
+ d.setTime(timestamp);
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sqlWithTimestamp = df.format(d) + " " + sql;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ @Override
+ public String getSqlWithTimestamp() {
+ return sqlWithTimestamp;
+ }
+}
+
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/GroupByQueryRecord.java
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/GroupByQueryRecord.java
new file mode 100644
index 0000000..a134df8
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/GroupByQueryRecord.java
@@ -0,0 +1,79 @@
+package org.apache.iotdb.db.query.workloadmanager.queryrecord;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class GroupByQueryRecord extends QueryRecord {
+ long startTime;
+ long endTime;
+ long interval;
+ long slidingStep;
+
+ public GroupByQueryRecord(String device, List<String> sensors, List<String>
ops, long startTime, long endTime, long interval, long slidingStep) {
+ this.sensors = new ArrayList<>(sensors);
+ this.ops = new ArrayList<>(ops);
+ this.device = device;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.interval = interval;
+ this.slidingStep = slidingStep;
+ this.recordType = QueryRecordType.GROUP_BY;
+ this.timestamp = new Date().getTime();
+ recoverSql();
+ recoverSqlWithTimestamp();
+ }
+
+ public GroupByQueryRecord(String device, List<String> sensors, List<String>
ops, long startTime, long endTime, long interval) {
+ this(device, sensors, ops, startTime, endTime, interval, interval);
+ }
+
+ private void recoverSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SELECT ");
+ for(int i = 0; i < sensors.size(); ++i) {
+ sb.append(ops.get(i) + "(" + sensors.get(i) + ")");
+ if (i != sensors.size() - 1) {
+ sb.append(", ");
+ }
+ }
+
+ sb.append(" FROM " + device);
+ sb.append(" GROUP BY ([");
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Date dStart = new Date();
+ dStart.setTime(startTime);
+
+ Date dEnd = new Date();
+ dEnd.setTime(endTime);
+ sb.append(df.format(dStart));
+ sb.append(", ");
+ sb.append(df.format(dEnd));
+ sb.append("), " + interval + "ms");
+ if (interval != slidingStep) {
+ sb.append(", " + slidingStep + "ms");
+ }
+ sb.append(")");
+
+ sql = sb.toString();
+ }
+
+ private void recoverSqlWithTimestamp() {
+ Date d = new Date();
+ d.setTime(timestamp);
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sqlWithTimestamp = df.format(d) + " " + sql;
+ }
+
+ @Override
+ public String getSql() {
+ return sql;
+ }
+
+
+ @Override
+ public String getSqlWithTimestamp() {
+ return sqlWithTimestamp;
+ }
+}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecord.java
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecord.java
new file mode 100644
index 0000000..c9aed4f
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecord.java
@@ -0,0 +1,31 @@
+package org.apache.iotdb.db.query.workloadmanager.queryrecord;
+
+import java.util.List;
+
+public abstract class QueryRecord {
+ protected int hashcode = 0;
+ protected String device;
+ protected List<String> sensors;
+ protected List<String> ops;
+ protected QueryRecordType recordType;
+ String sql;
+ String sqlWithTimestamp;
+ long timestamp;
+
+
+ public abstract String getSql();
+
+ public abstract String getSqlWithTimestamp();
+
+ public QueryRecordType getRecordType() {
+ return recordType;
+ }
+
+ public List<String> getSensors() {
+ return sensors;
+ }
+
+ public String getDevice() {
+ return device;
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecordType.java
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecordType.java
new file mode 100644
index 0000000..2ae783a
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecordType.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.db.query.workloadmanager.queryrecord;
+
+public enum QueryRecordType {
+ GROUP_BY, AGGREGATION
+}