This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch abstract_SeriesReaderWithoutValueFilter in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 40c6e94f97bbb5aa6720f067c786d6ae7bba5734 Author: jt2594838 <[email protected]> AuthorDate: Thu Jan 2 10:10:31 2020 +0800 abstract an interface ManagedSeriesReader from SeriesReaderWithoutValueFilter --- .../NewEngineDataSetWithoutValueFilter.java | 14 ++++---- .../iotdb/db/query/executor/EngineExecutor.java | 5 +-- .../iotdb/db/query/reader/ManagedSeriesReader.java | 37 ++++++++++++++++++++++ .../SeriesReaderWithoutValueFilter.java | 8 +++-- .../SeriesReaderWithValueFilterTest.java | 3 +- .../SeriesReaderWithoutValueFilterTest.java | 5 +-- 6 files changed, 58 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java index e3329ce..2590fa1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.dataset; import org.apache.iotdb.db.query.pool.QueryTaskPoolManager; -import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter; +import org.apache.iotdb.db.query.reader.ManagedSeriesReader; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; @@ -45,10 +45,10 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet { private static class ReadTask implements Runnable { - private final SeriesReaderWithoutValueFilter reader; + private final ManagedSeriesReader reader; private BlockingQueue<BatchData> blockingQueue; - public ReadTask(SeriesReaderWithoutValueFilter reader, BlockingQueue<BatchData> blockingQueue) { + public ReadTask(ManagedSeriesReader reader, BlockingQueue<BatchData> blockingQueue) { this.reader = reader; this.blockingQueue = blockingQueue; } @@ -95,7 +95,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet { } } - private List<SeriesReaderWithoutValueFilter> seriesReaderWithoutValueFilterList; + private List<ManagedSeriesReader> seriesReaderWithoutValueFilterList; private TreeSet<Long> timeHeap; @@ -130,7 +130,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet { * @param readers readers in List(IPointReader) structure */ public NewEngineDataSetWithoutValueFilter(List<Path> paths, List<TSDataType> dataTypes, - List<SeriesReaderWithoutValueFilter> readers) throws InterruptedException { + List<ManagedSeriesReader> readers) throws InterruptedException { super(paths, dataTypes); this.seriesReaderWithoutValueFilterList = readers; blockingQueueArray = new BlockingQueue[readers.size()]; @@ -145,7 +145,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet { private void init() throws InterruptedException { timeHeap = new TreeSet<>(); for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) { - SeriesReaderWithoutValueFilter reader = seriesReaderWithoutValueFilterList.get(i); + ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i); reader.setHasRemaining(true); reader.setManagedByQueryManager(true); pool.submit(new ReadTask(reader, blockingQueueArray[i])); @@ -341,7 +341,7 @@ public class NewEngineDataSetWithoutValueFilter extends QueryDataSet { synchronized (seriesReaderWithoutValueFilterList.get(seriesIndex)) { // we only need to judge whether to submit another task when the queue is not full if (blockingQueueArray[seriesIndex].remainingCapacity() > 0) { - SeriesReaderWithoutValueFilter reader = seriesReaderWithoutValueFilterList.get(seriesIndex); + ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(seriesIndex); // if the reader isn't being managed and still has more data, // that means this read task leave the pool before because the queue has no more space // now we should submit it again diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java index 56962c5..f6582ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.dataset.EngineDataSetWithValueFilter; import org.apache.iotdb.db.query.dataset.NewEngineDataSetWithoutValueFilter; import org.apache.iotdb.db.query.reader.IReaderByTimestamp; +import org.apache.iotdb.db.query.reader.ManagedSeriesReader; import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderByTimestamp; import org.apache.iotdb.db.query.reader.seriesRelated.SeriesReaderWithoutValueFilter; import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator; @@ -70,12 +71,12 @@ public class EngineExecutor { timeFilter = ((GlobalTimeExpression) optimizedExpression).getFilter(); } - List<SeriesReaderWithoutValueFilter> readersOfSelectedSeries = new ArrayList<>(); + List<ManagedSeriesReader> readersOfSelectedSeries = new ArrayList<>(); for (int i = 0; i < deduplicatedPaths.size(); i++) { Path path = deduplicatedPaths.get(i); TSDataType dataType = deduplicatedDataTypes.get(i); - SeriesReaderWithoutValueFilter reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context, + ManagedSeriesReader reader = new SeriesReaderWithoutValueFilter(path, dataType, timeFilter, context, true); readersOfSelectedSeries.add(reader); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java new file mode 100644 index 0000000..67925be --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/ManagedSeriesReader.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.reader; + +import org.apache.iotdb.tsfile.read.reader.IBatchReader; + +/** + * ManagedSeriesReader is a combination of IBatchReader and IPointReader that provides + * additional interfaces to make it able to be run in a thread pool concurrently within a query. + */ +public interface ManagedSeriesReader extends IBatchReader, IPointReader { + + boolean isManagedByQueryManager(); + + void setManagedByQueryManager(boolean managedByQueryManager); + + boolean hasRemaining(); + + void setHasRemaining(boolean hasRemaining); +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java index f9124fc..65d4585 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilter.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; -import org.apache.iotdb.db.query.reader.IPointReader; +import org.apache.iotdb.db.query.reader.ManagedSeriesReader; import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader; import org.apache.iotdb.db.query.reader.resourceRelated.NewUnseqResourceMergeReader; import org.apache.iotdb.db.utils.TimeValuePair; @@ -40,7 +40,7 @@ import java.io.IOException; * * "without value filter" is equivalent to "with global time filter or without any filter". */ -public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReader { +public class SeriesReaderWithoutValueFilter implements ManagedSeriesReader { private IBatchReader seqResourceIterateReader; private IBatchReader unseqResourceMergeReader; @@ -111,18 +111,22 @@ public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReade this.unseqResourceMergeReader = unseqResourceMergeReader; } + @Override public boolean isManagedByQueryManager() { return managedByQueryManager; } + @Override public void setManagedByQueryManager(boolean managedByQueryManager) { this.managedByQueryManager = managedByQueryManager; } + @Override public boolean hasRemaining() { return hasRemaining; } + @Override public void setHasRemaining(boolean hasRemaining) { this.hasRemaining = hasRemaining; } diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java index 3e9de56..ff02b72 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithValueFilterTest.java @@ -20,13 +20,14 @@ package org.apache.iotdb.db.query.reader.seriesRelated; import java.io.IOException; +import org.apache.iotdb.db.query.reader.ManagedSeriesReader; import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.apache.iotdb.tsfile.read.filter.ValueFilter; import org.junit.Test; public class SeriesReaderWithValueFilterTest { - private SeriesReaderWithoutValueFilter reader; + private ManagedSeriesReader reader; private void init() throws IOException { // (100,0),(105,1),(110,0),(115,1),(120,0),... diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java index ff06f20..141f210 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesReaderWithoutValueFilterTest.java @@ -20,13 +20,14 @@ package org.apache.iotdb.db.query.reader.seriesRelated; import java.io.IOException; +import org.apache.iotdb.db.query.reader.ManagedSeriesReader; import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.junit.Test; public class SeriesReaderWithoutValueFilterTest { - private SeriesReaderWithoutValueFilter reader1; - private SeriesReaderWithoutValueFilter reader2; + private ManagedSeriesReader reader1; + private ManagedSeriesReader reader2; private void init() throws IOException { IBatchReader batchReader1 = new FakedIBatchPoint(100, 1000, 7, 11);
