This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch clusterQueryOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5e415497a9e7a513eb7d290a562c7f3ae014991e
Author: Alima777 <[email protected]>
AuthorDate: Tue Mar 9 10:18:18 2021 +0800

    design and pseudo code
---
 .../cluster/query/reader/ClusterReaderFactory.java |  1 +
 .../dataset/RawQueryDataSetWithValueFilter.java    |  8 +--
 .../RemoteRawQueryDataSetWithValueFilter.java      | 68 ++++++++++++++++++++++
 .../db/query/reader/series/IReaderByTimestamp.java |  6 ++
 .../reader/series/SeriesReaderByTimestamp.java     | 16 +++++
 5 files changed, 95 insertions(+), 4 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index 9b78175..e77d7cd 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -217,6 +217,7 @@ public class ClusterReaderFactory {
       boolean ascending)
       throws StorageEngineException, EmptyIntervalException {
     // make sure the partition table is new
+    // TODO: don't need to sync metadata for every reader
     try {
       metaGroupMember.syncLeaderWithConsistencyCheck(false);
     } catch (CheckConsistencyException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
index bf48f10..e5349dc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
@@ -31,11 +31,11 @@ import java.util.List;
 
 public class RawQueryDataSetWithValueFilter extends QueryDataSet implements 
UDFInputDataSet {
 
-  private final TimeGenerator timeGenerator;
-  private final List<IReaderByTimestamp> seriesReaderByTimestampList;
-  private final List<Boolean> cached;
+  protected final TimeGenerator timeGenerator;
+  protected final List<IReaderByTimestamp> seriesReaderByTimestampList;
+  protected final List<Boolean> cached;
 
-  private boolean hasCachedRow;
+  protected boolean hasCachedRow;
   private RowRecord cachedRowRecord;
   private Object[] cachedRowInObjects;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java
new file mode 100644
index 0000000..c93e139
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java
@@ -0,0 +1,68 @@
+package org.apache.iotdb.db.query.dataset;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RemoteRawQueryDataSetWithValueFilter extends 
RawQueryDataSetWithValueFilter {
+
+  private List<RowRecord> cachedRowRecords = new ArrayList<>();
+  private Object[] objects;
+  private boolean[] isAllNull;
+  /**
+   * constructor of EngineDataSetWithValueFilter.
+   *
+   * @param paths paths in List structure
+   * @param dataTypes time series data type
+   * @param timeGenerator EngineTimeGenerator object
+   * @param readers readers in List(IReaderByTimeStamp) structure
+   * @param cached
+   * @param ascending specifies how the data should be sorted,'True' means 
read in ascending time
+   */
+  public RemoteRawQueryDataSetWithValueFilter(
+      List<PartialPath> paths,
+      List<TSDataType> dataTypes,
+      TimeGenerator timeGenerator,
+      List<IReaderByTimestamp> readers,
+      List<Boolean> cached,
+      boolean ascending) {
+    super(paths, dataTypes, timeGenerator, readers, cached, ascending);
+  }
+
+  /**
+   * Cache row record
+   *
+   * @return if there has next row record.
+   */
+  private boolean cacheRowRecord() throws IOException {
+    int cachedTimeCnt = 0;
+    long[] cachedTimeArray = new long[MAX_TIME_NUM];
+    // TODO: LIMIT constraint
+    while (timeGenerator.hasNext() && cachedTimeCnt < MAX_TIME_NUM) {
+      // 1. fill time array from time Generator
+      cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+    }
+    for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+      // 2. fetch results of each time series from readers using time array
+      Object[] results = 
seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray);
+      // 3. use values in results to fill row record
+      for (int j = 0; j < MAX_TIME_NUM; j++) {
+        if (i == 0) {
+          RowRecord rowRecord = new RowRecord(cachedTimeArray[]);
+        }
+        fillRowRecord();
+        if (results[j] != null) {
+          isAllNull = false;
+        }
+      }
+    }
+    // 4. remove rowRecord if all values in one timestamp are null
+    removeNonExistRecord();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
index 57d4813..28a5ad1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java
@@ -37,6 +37,12 @@ public interface IReaderByTimestamp {
   Object getValueInTimestamp(long timestamp) throws IOException;
 
   /**
+   * Returns all the corresponding values under the array of timestamp. 
Returns null if no value
+   * under one timestamp.
+   */
+  Object[] getValueInTimestamps(long[] timestamps) throws IOException;
+
+  /**
    * Returns whether there is no more data in reader.
    *
    * <p>True means no more data. False means you can still get more data
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
index f09b980..551c890 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java
@@ -77,6 +77,22 @@ public class SeriesReaderByTimestamp implements 
IReaderByTimestamp {
   }
 
   @Override
+  public Object[] getValueInTimestamps(long[] timestamp) throws IOException {
+    seriesReader.setTimeFilter(timestamp[0]);
+    Object[] results = new Object[timestamp.length];
+    for (int i = 0; i < timestamp.length; i++) {
+      if ((batchData == null || !hasAvailableData(batchData, timestamp[i]))
+          && !hasNext(timestamp[i])) {
+        // there is no more data
+        break;
+      }
+      results[i] = batchData.getValueInTimestamp(timestamp[i]);
+    }
+
+    return results;
+  }
+
+  @Override
   public boolean readerIsEmpty() throws IOException {
     return seriesReader.isEmpty() && isEmpty(batchData);
   }

Reply via email to