This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch nested-operations in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 025fd00aa3766e50e0baf0717b53893abf105375 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Sep 6 10:37:12 2021 +0800 SingleInputSingleOutputIntermediateLayer: constructPointReader & constructRowReader & constructRowSlidingSizeWindowReader --- ...SerializableTVListBackedSingleColumnWindow.java | 4 +- .../db/query/udf/core/layer/IntermediateLayer.java | 8 +- .../MultiInputMultiOutputIntermediateLayer.java | 27 +++- .../MultiInputSingleOutputIntermediateLayer.java | 27 +++- .../SingleInputMultiOutputIntermediateLayer.java | 29 +++- .../SingleInputSingleOutputIntermediateLayer.java | 166 ++++++++++++++++++++- .../iotdb/db/query/udf/core/layer/UDFLayer.java | 28 ++-- .../db/query/udf/core/reader/LayerRowReader.java | 2 +- 8 files changed, 262 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java index f91dd35..4157f90 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java @@ -76,14 +76,12 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin return rowIterator; } - private RowWindow seek(int beginIndex, int endIndex) { + public void seek(int beginIndex, int endIndex) { this.beginIndex = beginIndex; this.endIndex = endIndex; size = endIndex - beginIndex; row.seek(beginIndex); rowIterator = null; - - return this; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java index 069af89..49aae1a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.udf.core.layer; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy; import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; @@ -41,7 +42,7 @@ public abstract class IntermediateLayer { public abstract LayerRowReader constructRowReader(); public final LayerRowWindowReader constructRowWindowReader( - AccessStrategy strategy, float memoryBudgetInMB) { + AccessStrategy strategy, float memoryBudgetInMB) throws QueryProcessException { switch (strategy.getAccessStrategyType()) { case SLIDING_TIME_WINDOW: return constructRowSlidingTimeWindowReader( @@ -56,10 +57,9 @@ public abstract class IntermediateLayer { } protected abstract LayerRowWindowReader constructRowSlidingSizeWindowReader( - SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB); + SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) + throws QueryProcessException; protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader( SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB); - - public abstract void updateEvictionUpperBound(); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java index 9cd5422..2a70a7b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java @@ -19,17 +19,40 @@ package org.apache.iotdb.db.query.udf.core.layer; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; +import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader; +import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader; import java.util.List; -public class MultiInputMultiOutputIntermediateLayer implements IntermediateLayer { +public class MultiInputMultiOutputIntermediateLayer extends IntermediateLayer { public MultiInputMultiOutputIntermediateLayer( - List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {} + long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) { + super(queryId, memoryBudgetInMB); + } @Override public LayerPointReader constructPointReader() { return null; } + + @Override + public LayerRowReader constructRowReader() { + return null; + } + + @Override + protected LayerRowWindowReader constructRowSlidingSizeWindowReader( + SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) { + return null; + } + + @Override + protected LayerRowWindowReader constructRowSlidingTimeWindowReader( + SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) { + return null; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java index 71a4f06..c63b1be 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java @@ -19,17 +19,40 @@ package org.apache.iotdb.db.query.udf.core.layer; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; +import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader; +import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader; import java.util.List; -public class MultiInputSingleOutputIntermediateLayer implements IntermediateLayer { +public class MultiInputSingleOutputIntermediateLayer extends IntermediateLayer { public MultiInputSingleOutputIntermediateLayer( - List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {} + long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) { + super(queryId, memoryBudgetInMB); + } @Override public LayerPointReader constructPointReader() { return null; } + + @Override + public LayerRowReader constructRowReader() { + return null; + } + + @Override + protected LayerRowWindowReader constructRowSlidingSizeWindowReader( + SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) { + return null; + } + + @Override + protected LayerRowWindowReader constructRowSlidingTimeWindowReader( + SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) { + return null; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java index a711ced..17467b8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java @@ -20,27 +20,33 @@ package org.apache.iotdb.db.query.udf.core.layer; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; +import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader; +import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader; import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; import java.io.IOException; -public class SingleInputMultiOutputIntermediateLayer implements IntermediateLayer { +public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer { private static final int CACHE_BLOCK_SIZE = 2; - private final TSDataType dataType; private final LayerPointReader parentLayerPointReader; + private final TSDataType dataType; private final ElasticSerializableTVList tvList; private final SafetyLine safetyLine; public SingleInputMultiOutputIntermediateLayer( - LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB) + long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) throws QueryProcessException { + super(queryId, memoryBudgetInMB); this.parentLayerPointReader = parentLayerPointReader; + dataType = parentLayerPointReader.getDataType(); tvList = ElasticSerializableTVList.newElasticSerializableTVList( @@ -162,4 +168,21 @@ public class SingleInputMultiOutputIntermediateLayer implements IntermediateLaye } }; } + + @Override + public LayerRowReader constructRowReader() { + return null; + } + + @Override + protected LayerRowWindowReader constructRowSlidingSizeWindowReader( + SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) { + return null; + } + + @Override + protected LayerRowWindowReader constructRowSlidingTimeWindowReader( + SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) { + return null; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java index 131472a..3c1c705 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java @@ -19,13 +19,28 @@ package org.apache.iotdb.db.query.udf.core.layer; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.query.udf.api.access.Row; +import org.apache.iotdb.db.query.udf.api.access.RowWindow; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; +import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; +import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnWindow; +import org.apache.iotdb.db.query.udf.core.access.LayerPointReaderBackedSingleColumnRow; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; +import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader; +import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader; +import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -public class SingleInputSingleOutputIntermediateLayer implements IntermediateLayer { +import java.io.IOException; + +public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer { private final LayerPointReader parentLayerPointReader; - public SingleInputSingleOutputIntermediateLayer(LayerPointReader parentLayerPointReader) { + public SingleInputSingleOutputIntermediateLayer( + long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) { + super(queryId, memoryBudgetInMB); this.parentLayerPointReader = parentLayerPointReader; } @@ -33,4 +48,151 @@ public class SingleInputSingleOutputIntermediateLayer implements IntermediateLay public LayerPointReader constructPointReader() { return parentLayerPointReader; } + + @Override + public LayerRowReader constructRowReader() { + + return new LayerRowReader() { + + private final Row row = new LayerPointReaderBackedSingleColumnRow(parentLayerPointReader); + private final TSDataType[] dataTypes = + new TSDataType[] {parentLayerPointReader.getDataType()}; + private boolean hasCached = false; + + @Override + public boolean next() throws IOException, QueryProcessException { + if (hasCached) { + return true; + } + hasCached = parentLayerPointReader.next(); + return hasCached; + } + + @Override + public void readyForNext() { + parentLayerPointReader.readyForNext(); + hasCached = false; + } + + @Override + public TSDataType[] getDataTypes() { + return dataTypes; + } + + @Override + public long currentTime() throws IOException { + return parentLayerPointReader.currentTime(); + } + + @Override + public Row currentRow() { + return row; + } + }; + } + + @Override + protected LayerRowWindowReader constructRowSlidingSizeWindowReader( + SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) + throws QueryProcessException { + + return new LayerRowWindowReader() { + + private final int windowSize = strategy.getWindowSize(); + private final int slidingStep = strategy.getSlidingStep(); + + private final TSDataType dataType = parentLayerPointReader.getDataType(); + private final ElasticSerializableTVList tvList = + ElasticSerializableTVList.newElasticSerializableTVList( + dataType, queryId, memoryBudgetInMB, 2); + private final ElasticSerializableTVListBackedSingleColumnWindow window = + new ElasticSerializableTVListBackedSingleColumnWindow(tvList); + + private boolean hasCached = false; + private int beginIndex = -slidingStep; + + @Override + public boolean next() throws IOException, QueryProcessException { + if (hasCached) { + return true; + } + + beginIndex += slidingStep; + int endIndex = beginIndex + windowSize; + + int pointsToBeCollected = endIndex - tvList.size(); + if (0 < pointsToBeCollected) { + hasCached = collectPoints(pointsToBeCollected) != 0; + window.seek(beginIndex, tvList.size()); + } else { + hasCached = true; + window.seek(beginIndex, endIndex); + } + + return hasCached; + } + + /** @return number of actually collected, which may be less than or equals to pointNumber */ + private int collectPoints(int pointNumber) throws QueryProcessException, IOException { + int count = 0; + + while (parentLayerPointReader.next() && count < pointNumber) { + ++count; + + switch (dataType) { + case INT32: + tvList.putInt( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt()); + break; + case INT64: + tvList.putLong( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong()); + break; + case FLOAT: + tvList.putFloat( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat()); + break; + case DOUBLE: + tvList.putDouble( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble()); + break; + case BOOLEAN: + tvList.putBoolean( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean()); + break; + case TEXT: + tvList.putBinary( + parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary()); + break; + default: + } + + parentLayerPointReader.readyForNext(); + } + + return count; + } + + @Override + public void readyForNext() { + hasCached = false; + } + + @Override + public TSDataType[] getDataTypes() { + return new TSDataType[] {dataType}; + } + + @Override + public RowWindow currentWindow() { + return window; + } + }; + } + + @Override + protected LayerRowWindowReader constructRowSlidingTimeWindowReader( + SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) { + return null; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java index a613c4c..3148dd4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java @@ -31,8 +31,8 @@ import org.apache.iotdb.db.query.udf.api.access.RowWindow; import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy; import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy; import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; -import org.apache.iotdb.db.query.udf.core.access.RowImpl; -import org.apache.iotdb.db.query.udf.core.access.RowWindowImpl; +import org.apache.iotdb.db.query.udf.core.access.MultiColumnRow; +import org.apache.iotdb.db.query.udf.core.access.MultiColumnWindow; import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader; @@ -68,7 +68,7 @@ public class UDFLayer { List<TSDataType> dataTypes, List<ManagedSeriesReader> readers) throws QueryProcessException, IOException, InterruptedException { - constructInputLayer( + construct( queryId, memoryBudgetInMB, new RawQueryDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true)); @@ -84,14 +84,18 @@ public class UDFLayer { List<IReaderByTimestamp> readers, List<Boolean> cached) throws QueryProcessException { - constructInputLayer( + construct( queryId, memoryBudgetInMB, new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true)); } - private void constructInputLayer( - long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet) + public UDFLayer(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet) + throws QueryProcessException { + construct(queryId, memoryBudgetInMB, queryDataSet); + } + + private void construct(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet) throws QueryProcessException { this.queryId = queryId; this.queryDataSet = queryDataSet; @@ -242,7 +246,7 @@ public class UDFLayer { private boolean hasCachedRowRecord; private Object[] cachedRowRecord; - private final RowImpl row; + private final MultiColumnRow row; public InputLayerRowReader(int[] columnIndexes) { safetyPile = safetyLine.addSafetyPile(); @@ -253,7 +257,7 @@ public class UDFLayer { hasCachedRowRecord = false; cachedRowRecord = null; - row = new RowImpl(columnIndexes, dataTypes); + row = new MultiColumnRow(columnIndexes, dataTypes); } @Override @@ -321,7 +325,7 @@ public class UDFLayer { private final int windowSize; private final IntList rowIndexes; - private final RowWindowImpl rowWindow; + private final MultiColumnWindow rowWindow; private final int slidingStep; @@ -343,7 +347,7 @@ public class UDFLayer { windowSize < SerializableIntList.calculateCapacity(memoryBudgetInMB) ? new WrappedIntArray(windowSize) : new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2); - rowWindow = new RowWindowImpl(rowRecordList, columnIndexes, dataTypes, rowIndexes); + rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes); slidingStep = accessStrategy.getSlidingStep(); @@ -452,7 +456,7 @@ public class UDFLayer { private final long displayWindowEnd; private final IntList rowIndexes; - private final RowWindowImpl rowWindow; + private final MultiColumnWindow rowWindow; private long nextWindowTimeBegin; private int nextIndexBegin; @@ -475,7 +479,7 @@ public class UDFLayer { displayWindowEnd = accessStrategy.getDisplayWindowEnd(); rowIndexes = new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2); - rowWindow = new RowWindowImpl(rowRecordList, columnIndexes, dataTypes, rowIndexes); + rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes); nextWindowTimeBegin = accessStrategy.getDisplayWindowBegin(); nextIndexBegin = 0; diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java index 413a24d..0bc6521 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java @@ -33,7 +33,7 @@ public interface LayerRowReader { TSDataType[] getDataTypes(); - long currentTime(); + long currentTime() throws IOException; Row currentRow(); }
