This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch query_module
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/query_module by this push:
     new 2675ebf  Add workload manager (#2520)
2675ebf is described below

commit 2675ebf206cf26eb4340f1a58d0f840ab87736e2
Author: liuxuxin <[email protected]>
AuthorDate: Thu Jan 21 20:29:25 2021 +0800

    Add workload manager (#2520)
    
    Add workload manager
---
 grafana/pom.xml                                    |  2 +-
 jdbc/pom.xml                                       |  2 +-
 pom.xml                                            |  6 +-
 server/src/assembly/resources/conf/logback.xml     | 22 ++++++
 .../groupby/GroupByWithValueFilterDataSet.java     | 31 +++++++++
 .../groupby/GroupByWithoutValueFilterDataSet.java  | 27 ++++++++
 .../db/query/executor/AggregationExecutor.java     | 68 +++++++++++++++++++
 .../db/query/workloadmanager/WorkloadManager.java  | 64 ++++++++++++++++++
 .../queryrecord/AggregationQueryRecord.java        | 49 ++++++++++++++
 .../queryrecord/GroupByQueryRecord.java            | 79 ++++++++++++++++++++++
 .../workloadmanager/queryrecord/QueryRecord.java   | 31 +++++++++
 .../queryrecord/QueryRecordType.java               |  5 ++
 12 files changed, 381 insertions(+), 5 deletions(-)

diff --git a/grafana/pom.xml b/grafana/pom.xml
index 357350a..b605e65 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -165,7 +165,7 @@
                                     <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                         
<resource>META-INF/spring.schemas</resource>
                                     </transformer>
-                                    <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                                    <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                     <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                         <mainClass>${start-class}</mainClass>
                                     </transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index b44bb46..090fdb0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -223,7 +223,7 @@
                                                 </goals>
                                             </pluginExecutionFilter>
                                             <action>
-                                                <ignore />
+                                                <ignore/>
                                             </action>
                                         </pluginExecution>
                                     </pluginExecutions>
diff --git a/pom.xml b/pom.xml
index c0a1dc4..96752d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,7 +139,7 @@
         <sonar.exclusions>**/generated-sources</sonar.exclusions>
         <!-- By default, the argLine is empty-->
         <gson.version>2.8.6</gson.version>
-        <argLine />
+        <argLine/>
     </properties>
     <!--
         if we claim dependencies in dependencyManagement, then we do not claim
@@ -599,7 +599,7 @@
                         <id>enforce-version-convergence</id>
                         <configuration>
                             <rules>
-                                <dependencyConvergence />
+                                <dependencyConvergence/>
                             </rules>
                         </configuration>
                         <goals>
@@ -645,7 +645,7 @@
                                 </requireJavaVersion>
                                 <!-- Disabled for now as it breaks the ability 
to build single modules -->
                                 <!--reactorModuleConvergence/-->
-                                <banVulnerable 
implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" 
/>
+                                <banVulnerable 
implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
                             </rules>
                         </configuration>
                     </execution>
diff --git a/server/src/assembly/resources/conf/logback.xml 
b/server/src/assembly/resources/conf/logback.xml
index 22a80d0..05bea04 100644
--- a/server/src/assembly/resources/conf/logback.xml
+++ b/server/src/assembly/resources/conf/logback.xml
@@ -251,6 +251,25 @@
             <level>INFO</level>
         </filter>
     </appender>
+    <appender class="ch.qos.logback.core.rolling.RollingFileAppender" 
name="QUERY_RECORD">
+        <file>${IOTDB_HOME}/logs/log_query_record.log</file>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            
<fileNamePattern>${IOTDB_HOME}/logs/log-query-record-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy 
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>300MB</maxFileSize>
+        </triggeringPolicy>
+        <append>true</append>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%d [%t] %-5p %C:%L - %m %n</pattern>
+            <charset>utf-8</charset>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
     <root level="info">
         <appender-ref ref="FILEDEBUG"/>
         <appender-ref ref="FILEWARN"/>
@@ -280,4 +299,7 @@
     <logger level="info" name="QUERY_FREQUENCY">
         <appender-ref ref="QUERY_FREQUENCY"/>
     </logger>
+    <logger level="info" name="QUERY_RECORD">
+        <appender-ref ref="QUERY_RECORD"/>
+    </logger>
 </configuration>
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index bd37495..8ce0202 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.stream.Collectors;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -40,6 +42,9 @@ import org.apache.iotdb.db.query.filter.TsFileFilter;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
+import 
org.apache.iotdb.db.query.workloadmanager.queryrecord.GroupByQueryRecord;
+import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -91,16 +96,42 @@ public class GroupByWithValueFilterDataSet extends 
GroupByEngineDataSet {
 
     List<StorageGroupProcessor> list = StorageEngine.getInstance()
         .mergeLock(paths.stream().map(p -> (PartialPath) 
p).collect(Collectors.toList()));
+    WorkloadManager manager = WorkloadManager.getInstance();
+    Map<String, List<Integer>> deviceQueryIdxMap = new HashMap<>();
     try {
       for (int i = 0; i < paths.size(); i++) {
         PartialPath path = (PartialPath) paths.get(i);
         allDataReaderList
             .add(getReaderByTime(path, groupByTimePlan, dataTypes.get(i), 
context, null));
+        // Map the device id to the corresponding path indexes
+        if (deviceQueryIdxMap.containsKey(path.getDevice())) {
+          deviceQueryIdxMap.get(path.getDevice()).add(i);
+        } else {
+          deviceQueryIdxMap.put(path.getDevice(), new ArrayList<>());
+          deviceQueryIdxMap.get(path.getDevice()).add(i);
+        }
+      }
+
+      // Add the query records to the workload manager
+      for(String device: deviceQueryIdxMap.keySet()) {
+        List<Integer> pathIndexes = deviceQueryIdxMap.get(device);
+        List<String> sensors = new ArrayList<>();
+        List<String> ops = new ArrayList<>();
+        for(int idx: pathIndexes) {
+          PartialPath path = (PartialPath) paths.get(idx);
+          sensors.add(path.getMeasurement());
+          ops.add(groupByTimePlan.getDeduplicatedAggregations().get(idx));
+        }
+        QueryRecord record = new GroupByQueryRecord(device, sensors, ops, 
groupByTimePlan.getStartTime(),
+                groupByTimePlan.getEndTime(), groupByTimePlan.getInterval(), 
groupByTimePlan.getSlidingStep());
+        manager.addRecord(record);
       }
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
     }
 
+
+
   }
 
   protected TimeGenerator getTimeGenerator(IExpression expression, 
QueryContext context,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index a5f42a7..ff3226a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -37,6 +37,9 @@ import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.factory.AggregateResultFactory;
 import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
+import 
org.apache.iotdb.db.query.workloadmanager.queryrecord.GroupByQueryRecord;
+import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
@@ -91,6 +94,8 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
         .mergeLock(paths.stream().map(p -> (PartialPath) 
p).collect(Collectors.toList()));
     try {
       // init resultIndexes, group result indexes by path
+      WorkloadManager manager = WorkloadManager.getInstance();
+      Map<String, List<Integer>> deviceQueryIdxMap = new HashMap<>();
       for (int i = 0; i < paths.size(); i++) {
         PartialPath path = (PartialPath) paths.get(i);
         if (!pathExecutors.containsKey(path)) {
@@ -105,6 +110,28 @@ public class GroupByWithoutValueFilterDataSet extends 
GroupByEngineDataSet {
             
.getAggrResultByName(groupByTimePlan.getDeduplicatedAggregations().get(i),
                 dataTypes.get(i), ascending);
         pathExecutors.get(path).addAggregateResult(aggrResult);
+        // Map the device id to the corresponding query indexes
+        if (deviceQueryIdxMap.containsKey(path.getDevice())) {
+          deviceQueryIdxMap.get(path.getDevice()).add(i);
+        } else {
+          deviceQueryIdxMap.put(path.getDevice(), new ArrayList<>());
+          deviceQueryIdxMap.get(path.getDevice()).add(i);
+        }
+      }
+
+      // Add the query records to the workload manager
+      for(String device: deviceQueryIdxMap.keySet()) {
+        List<Integer> pathIndexes = deviceQueryIdxMap.get(device);
+        List<String> sensors = new ArrayList<>();
+        List<String> ops = new ArrayList<>();
+        for(int idx: pathIndexes) {
+          PartialPath path = (PartialPath) paths.get(idx);
+          sensors.add(path.getMeasurement());
+          ops.add(groupByTimePlan.getDeduplicatedAggregations().get(idx));
+        }
+        QueryRecord record = new GroupByQueryRecord(device, sensors, ops, 
groupByTimePlan.getStartTime(),
+                groupByTimePlan.getEndTime(), groupByTimePlan.getInterval(), 
groupByTimePlan.getSlidingStep());
+        manager.addRecord(record);
       }
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 81dc0c9..b33fa19 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -47,6 +47,9 @@ import 
org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp;
 import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator;
+import org.apache.iotdb.db.query.workloadmanager.WorkloadManager;
+import 
org.apache.iotdb.db.query.workloadmanager.queryrecord.AggregationQueryRecord;
+import org.apache.iotdb.db.query.workloadmanager.queryrecord.QueryRecord;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -102,11 +105,44 @@ public class AggregationExecutor {
     // TODO-Cluster: group the paths by storage group to reduce communications
     List<StorageGroupProcessor> list = StorageEngine.getInstance()
         .mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet()));
+    WorkloadManager manager = WorkloadManager.getInstance();
+    // DeviceID -> MeasurementID -> List<Aggregation Operation>
+    Map<String, Map<String, List<String>>> deviceSensorMap = new HashMap<>();
     try {
       for (Map.Entry<PartialPath, List<Integer>> entry : 
pathToAggrIndexesMap.entrySet()) {
         aggregateOneSeries(entry, aggregateResultList,
             
aggregationPlan.getAllMeasurementsInDevice(entry.getKey().getDevice()), 
timeFilter,
             context);
+
+        // Record the query in the map
+        if (!deviceSensorMap.containsKey(entry.getKey().getDevice())) {
+          deviceSensorMap.put(entry.getKey().getDevice(), new HashMap<>());
+        }
+        Map<String, List<String>> measurementOpMap = 
deviceSensorMap.get(entry.getKey().getDevice());
+        if (!measurementOpMap.containsKey(entry.getKey().getMeasurement())) {
+          measurementOpMap.put(entry.getKey().getMeasurement(), new 
ArrayList<>());
+        }
+        List<Integer> aggrIndexes = entry.getValue();
+        List<String> ops = 
measurementOpMap.get(entry.getKey().getMeasurement());
+        for(int idx = 0; idx < aggrIndexes.size(); ++idx) {
+          
ops.add(aggregationPlan.getDeduplicatedAggregations().get(aggrIndexes.get(idx)));
+        }
+      }
+
+      // Put the query records into the manager
+      for (String device: deviceSensorMap.keySet()) {
+        Map<String, List<String>> measurementOpMap = 
deviceSensorMap.get(device);
+        List<String> curDeviceMeasurements = new ArrayList<>();
+        List<String> curDeviceOps = new ArrayList<>();
+        for (String measurement: measurementOpMap.keySet()) {
+          List<String> ops = measurementOpMap.get(measurement);
+          for(int i = 0; i < ops.size(); ++i) {
+            curDeviceMeasurements.add(measurement);
+            curDeviceOps.add(ops.get(i));
+          }
+        }
+        QueryRecord record = new AggregationQueryRecord(device, 
curDeviceMeasurements, curDeviceOps);
+        manager.addRecord(record);
       }
     } finally {
       StorageEngine.getInstance().mergeUnLock(list);
@@ -325,6 +361,38 @@ public class AggregationExecutor {
           .getAggrResultByName(aggregations.get(i), type, ascending);
       aggregateResults.add(result);
     }
+    // Workload collection
+    WorkloadManager manager = WorkloadManager.getInstance();
+    // DeviceID -> MeasurementID -> List<Aggregation Operation>
+    Map<String, Map<String, List<String>>> deviceSensorMap = new HashMap<>();
+    // Record the query in the map
+    for(int i = 0; i < selectedSeries.size(); ++i) {
+      if (!deviceSensorMap.containsKey(selectedSeries.get(i).getDevice())) {
+        deviceSensorMap.put(selectedSeries.get(i).getDevice(), new 
HashMap<>());
+      }
+      Map<String, List<String>> measurementOpMap = 
deviceSensorMap.get(selectedSeries.get(i).getDevice());
+      if 
(!measurementOpMap.containsKey(selectedSeries.get(i).getMeasurement())) {
+        measurementOpMap.put(selectedSeries.get(i).getMeasurement(), new 
ArrayList<>());
+      }
+      List<String> opList = 
measurementOpMap.get(selectedSeries.get(i).getMeasurement());
+      opList.add(aggregations.get(i));
+    }
+
+    // Put the query records to the manager
+    for (String device: deviceSensorMap.keySet()) {
+      Map<String, List<String>> measurementOpMap = deviceSensorMap.get(device);
+      List<String> curDeviceMeasurements = new ArrayList<>();
+      List<String> curDeviceOps = new ArrayList<>();
+      for (String measurement: measurementOpMap.keySet()) {
+        List<String> ops = measurementOpMap.get(measurement);
+        for(int i = 0; i < ops.size(); ++i) {
+          curDeviceMeasurements.add(measurement);
+          curDeviceOps.add(ops.get(i));
+        }
+      }
+      QueryRecord record = new AggregationQueryRecord(device, 
curDeviceMeasurements, curDeviceOps);
+      manager.addRecord(record);
+    }
     aggregateWithValueFilter(aggregateResults, timestampGenerator, 
readersOfSelectedSeries);
     return constructDataSet(aggregateResults, queryPlan);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/WorkloadManager.java
 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/WorkloadManager.java
new file mode 100644
index 0000000..f325097
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/WorkloadManager.java
@@ -0,0 +1,64 @@
+package org.apache.iotdb.db.query.workloadmanager;
+
+import org.apache.iotdb.db.query.workloadmanager.queryrecord.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkloadManager {
+  List<QueryRecord> records = new ArrayList<>();
+  private final Logger QUERY_RECORD_LOGGER = 
LoggerFactory.getLogger("QUERY_RECORD");
+  private final int RECORDS_NUM_THRESHOLD = 300;
+  private final ExecutorService flushExecutor = 
Executors.newFixedThreadPool(1);
+
+  private WorkloadManager() {
+  }
+
+  private static class WorkloadManagerHolder {
+    private static final WorkloadManager INSTANCE = new WorkloadManager();
+  }
+
+  private class QueryRecordFlushTask implements Runnable{
+    List<QueryRecord> records;
+    Logger QUERY_RECORD_LOGGER;
+
+    private QueryRecordFlushTask(List<QueryRecord> r, Logger l) {
+      records = r;
+      QUERY_RECORD_LOGGER = l;
+    }
+
+    @Override
+    public void run() {
+      for (QueryRecord record : records) {
+        QUERY_RECORD_LOGGER.info(record.getSql());
+      }
+    }
+  }
+
+  public static WorkloadManager getInstance() {
+    return WorkloadManagerHolder.INSTANCE;
+  }
+
+  public synchronized void addAggregationRecord(String device, List<String> 
sensors, List<String> ops) {
+    // add aggregation record
+    QueryRecord record = new AggregationQueryRecord(device, sensors, ops);
+    this.addRecord(record);
+  }
+
+  public synchronized void addGroupByQueryRecord(String device, List<String> 
sensors, List<String> ops,
+                                                 long startTime, long endTime, 
long interval, long slidingStep) {
+    QueryRecord record = new GroupByQueryRecord(device, sensors, ops, 
startTime, endTime, interval, slidingStep);
+    this.addRecord(record);
+  }
+
+  public synchronized void addRecord(QueryRecord record) {
+    records.add(record);
+    if (records.size() > RECORDS_NUM_THRESHOLD) {
+      flushExecutor.execute(new QueryRecordFlushTask(records, 
QUERY_RECORD_LOGGER));
+      this.records = new ArrayList<>();
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/AggregationQueryRecord.java
 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/AggregationQueryRecord.java
new file mode 100644
index 0000000..3772680
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/AggregationQueryRecord.java
@@ -0,0 +1,49 @@
+package org.apache.iotdb.db.query.workloadmanager.queryrecord;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class AggregationQueryRecord extends QueryRecord {
+
+  public AggregationQueryRecord(String device, List<String> sensorList, 
List<String> opList) {
+    sensors = new ArrayList<>(sensorList);
+    ops = new ArrayList<>(opList);
+    this.timestamp = new Date().getTime();
+    this.device = device;
+    this.recordType = QueryRecordType.AGGREGATION;
+    recoverSql();
+    recoverSqlWithTimestamp();
+  }
+
+  private void recoverSql() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    for (int i = 0; i < sensors.size(); ++i) {
+      sb.append(ops.get(i) + "(" + sensors.get(i) + ")");
+      if (i != sensors.size() - 1)
+        sb.append(", ");
+    }
+    sb.append(" FROM ");
+    sb.append(device);
+    sql = sb.toString();
+  }
+
+  private void recoverSqlWithTimestamp() {
+    Date d = new Date();
+    d.setTime(timestamp);
+    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    sqlWithTimestamp = df.format(d) + " " + sql;
+  }
+
+  public String getSql() {
+    return sql;
+  }
+
+  @Override
+  public String getSqlWithTimestamp() {
+    return sqlWithTimestamp;
+  }
+}
+
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/GroupByQueryRecord.java
 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/GroupByQueryRecord.java
new file mode 100644
index 0000000..a134df8
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/GroupByQueryRecord.java
@@ -0,0 +1,79 @@
+package org.apache.iotdb.db.query.workloadmanager.queryrecord;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class GroupByQueryRecord extends QueryRecord {
+  long startTime;
+  long endTime;
+  long interval;
+  long slidingStep;
+
+  public GroupByQueryRecord(String device, List<String> sensors, List<String> 
ops, long startTime, long endTime, long interval, long slidingStep) {
+    this.sensors = new ArrayList<>(sensors);
+    this.ops = new ArrayList<>(ops);
+    this.device = device;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.interval = interval;
+    this.slidingStep = slidingStep;
+    this.recordType = QueryRecordType.GROUP_BY;
+    this.timestamp = new Date().getTime();
+    recoverSql();
+    recoverSqlWithTimestamp();
+  }
+
+  public GroupByQueryRecord(String device, List<String> sensors, List<String> 
ops, long startTime, long endTime, long interval) {
+    this(device, sensors, ops, startTime, endTime, interval, interval);
+  }
+
+  private void recoverSql() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT ");
+    for(int i = 0; i < sensors.size(); ++i) {
+      sb.append(ops.get(i) + "(" + sensors.get(i) +  ")");
+      if (i != sensors.size() - 1) {
+        sb.append(", ");
+      }
+    }
+
+    sb.append(" FROM " + device);
+    sb.append(" GROUP BY ([");
+    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    Date dStart = new Date();
+    dStart.setTime(startTime);
+
+    Date dEnd = new Date();
+    dEnd.setTime(endTime);
+    sb.append(df.format(dStart));
+    sb.append(", ");
+    sb.append(df.format(dEnd));
+    sb.append("), " + interval + "ms");
+    if (interval != slidingStep) {
+      sb.append(", " + slidingStep + "ms");
+    }
+    sb.append(")");
+
+    sql = sb.toString();
+  }
+
+  private void recoverSqlWithTimestamp() {
+    Date d = new Date();
+    d.setTime(timestamp);
+    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    sqlWithTimestamp = df.format(d) + " " + sql;
+  }
+
+  @Override
+  public String getSql() {
+    return sql;
+  }
+
+
+  @Override
+  public String getSqlWithTimestamp() {
+    return sqlWithTimestamp;
+  }
+}
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecord.java
 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecord.java
new file mode 100644
index 0000000..c9aed4f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecord.java
@@ -0,0 +1,31 @@
+package org.apache.iotdb.db.query.workloadmanager.queryrecord;
+
+import java.util.List;
+
+public abstract class QueryRecord {
+  protected int hashcode = 0;
+  protected String device;
+  protected List<String> sensors;
+  protected List<String> ops;
+  protected QueryRecordType recordType;
+  String sql;
+  String sqlWithTimestamp;
+  long timestamp;
+
+
+  public abstract String getSql();
+
+  public abstract String getSqlWithTimestamp();
+
+  public QueryRecordType getRecordType() {
+    return recordType;
+  }
+
+  public List<String> getSensors() {
+    return sensors;
+  }
+
+  public String getDevice() {
+    return device;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecordType.java
 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecordType.java
new file mode 100644
index 0000000..2ae783a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/workloadmanager/queryrecord/QueryRecordType.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.db.query.workloadmanager.queryrecord;
+
+public enum QueryRecordType {
+  GROUP_BY, AGGREGATION
+}

Reply via email to