This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch clusterQueryOpt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5e415497a9e7a513eb7d290a562c7f3ae014991e Author: Alima777 <[email protected]> AuthorDate: Tue Mar 9 10:18:18 2021 +0800 design and pseudo code --- .../cluster/query/reader/ClusterReaderFactory.java | 1 + .../dataset/RawQueryDataSetWithValueFilter.java | 8 +-- .../RemoteRawQueryDataSetWithValueFilter.java | 68 ++++++++++++++++++++++ .../db/query/reader/series/IReaderByTimestamp.java | 6 ++ .../reader/series/SeriesReaderByTimestamp.java | 16 +++++ 5 files changed, 95 insertions(+), 4 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java index 9b78175..e77d7cd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java @@ -217,6 +217,7 @@ public class ClusterReaderFactory { boolean ascending) throws StorageEngineException, EmptyIntervalException { // make sure the partition table is new + // TODO: don't need to sync metadata for every reader try { metaGroupMember.syncLeaderWithConsistencyCheck(false); } catch (CheckConsistencyException e) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java index bf48f10..e5349dc 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java @@ -31,11 +31,11 @@ import java.util.List; public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFInputDataSet { - private final TimeGenerator timeGenerator; - private final List<IReaderByTimestamp> seriesReaderByTimestampList; - private final List<Boolean> cached; + protected final TimeGenerator timeGenerator; + protected final List<IReaderByTimestamp> seriesReaderByTimestampList; + protected final List<Boolean> cached; - private boolean hasCachedRow; + protected boolean hasCachedRow; private RowRecord cachedRowRecord; private Object[] cachedRowInObjects; diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java new file mode 100644 index 0000000..c93e139 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RemoteRawQueryDataSetWithValueFilter.java @@ -0,0 +1,68 @@ +package org.apache.iotdb.db.query.dataset; + +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class RemoteRawQueryDataSetWithValueFilter extends RawQueryDataSetWithValueFilter { + + private List<RowRecord> cachedRowRecords = new ArrayList<>(); + private Object[] objects; + private boolean[] isAllNull; + /** + * constructor of EngineDataSetWithValueFilter. + * + * @param paths paths in List structure + * @param dataTypes time series data type + * @param timeGenerator EngineTimeGenerator object + * @param readers readers in List(IReaderByTimeStamp) structure + * @param cached + * @param ascending specifies how the data should be sorted,'True' means read in ascending time + */ + public RemoteRawQueryDataSetWithValueFilter( + List<PartialPath> paths, + List<TSDataType> dataTypes, + TimeGenerator timeGenerator, + List<IReaderByTimestamp> readers, + List<Boolean> cached, + boolean ascending) { + super(paths, dataTypes, timeGenerator, readers, cached, ascending); + } + + /** + * Cache row record + * + * @return if there has next row record. + */ + private boolean cacheRowRecord() throws IOException { + int cachedTimeCnt = 0; + long[] cachedTimeArray = new long[MAX_TIME_NUM]; + // TODO: LIMIT constraint + while (timeGenerator.hasNext() && cachedTimeCnt < MAX_TIME_NUM) { + // 1. fill time array from time Generator + cachedTimeArray[cachedTimeCnt++] = timeGenerator.next(); + } + for (int i = 0; i < seriesReaderByTimestampList.size(); i++) { + // 2. fetch results of each time series from readers using time array + Object[] results = seriesReaderByTimestampList.get(i).getValueInTimestamps(cachedTimeArray); + // 3. use values in results to fill row record + for (int j = 0; j < MAX_TIME_NUM; j++) { + if (i == 0) { + RowRecord rowRecord = new RowRecord(cachedTimeArray[]); + } + fillRowRecord(); + if (results[j] != null) { + isAllNull = false; + } + } + } + // 4. remove rowRecord if all values in one timestamp are null + removeNonExistRecord(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java index 57d4813..28a5ad1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IReaderByTimestamp.java @@ -37,6 +37,12 @@ public interface IReaderByTimestamp { Object getValueInTimestamp(long timestamp) throws IOException; /** + * Returns all the corresponding values under the array of timestamp. Returns null if no value + * under one timestamp. + */ + Object[] getValueInTimestamps(long[] timestamps) throws IOException; + + /** * Returns whether there is no more data in reader. * * <p>True means no more data. False means you can still get more data diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java index f09b980..551c890 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java @@ -77,6 +77,22 @@ public class SeriesReaderByTimestamp implements IReaderByTimestamp { } @Override + public Object[] getValueInTimestamps(long[] timestamp) throws IOException { + seriesReader.setTimeFilter(timestamp[0]); + Object[] results = new Object[timestamp.length]; + for (int i = 0; i < timestamp.length; i++) { + if ((batchData == null || !hasAvailableData(batchData, timestamp[i])) + && !hasNext(timestamp[i])) { + // there is no more data + break; + } + results[i] = batchData.getValueInTimestamp(timestamp[i]); + } + + return results; + } + + @Override public boolean readerIsEmpty() throws IOException { return seriesReader.isEmpty() && isEmpty(batchData); }
