This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch udf-operator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e7d195563dde8bbffe68bd44e654cf603958cef Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Apr 12 19:58:59 2022 +0800 TsBlockInputDataSet --- .../db/mpp/operator/source/SeriesScanUtil.java | 14 ++-- .../query/udf/core/layer/TsBlockInputDataSet.java | 64 ++++++++++++++++++ .../iotdb/tsfile/read/common/block/TsBlock.java | 78 +++++++++++++++++----- .../read/common/block/column/BinaryColumn.java | 5 ++ .../read/common/block/column/BooleanColumn.java | 5 ++ .../tsfile/read/common/block/column/Column.java | 5 ++ .../read/common/block/column/DoubleColumn.java | 5 ++ .../read/common/block/column/FloatColumn.java | 5 ++ .../tsfile/read/common/block/column/IntColumn.java | 5 ++ .../read/common/block/column/LongColumn.java | 5 ++ .../block/column/RunLengthEncodedColumn.java | 6 ++ .../read/common/block/column/TimeColumn.java | 5 ++ 12 files changed, 182 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java index 80e0a67771..f65947a45d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java @@ -47,7 +47,13 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import java.io.IOException; import java.io.Serializable; -import java.util.*; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; import java.util.function.ToLongFunction; import java.util.stream.Collectors; @@ -705,7 +711,7 @@ public class SeriesScanUtil { mergeReader.addReader( firstPageReader .getAllSatisfiedPageData(orderUtils.getAscending()) - .getTsBlockIterator(), + .getTsBlockColumnIterator(), firstPageReader.version, orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()), context); @@ -732,7 +738,7 @@ public class SeriesScanUtil { mergeReader.addReader( pageReader .getAllSatisfiedPageData(orderUtils.getAscending()) - .getTsBlockIterator(), + .getTsBlockColumnIterator(), pageReader.version, orderUtils.getOverlapCheckTime(pageReader.getStatistics()), context); @@ -913,7 +919,7 @@ public class SeriesScanUtil { private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException { mergeReader.addReader( - pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getTsBlockIterator(), + pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getTsBlockColumnIterator(), pageReader.version, orderUtils.getOverlapCheckTime(pageReader.getStatistics()), context); diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java new file mode 100644 index 0000000000..a12e5a0cab --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.core.layer; + +import org.apache.iotdb.db.mpp.operator.Operator; +import org.apache.iotdb.db.query.dataset.IUDFInputDataSet; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockRowIterator; + +import java.util.List; + +public class TsBlockInputDataSet implements IUDFInputDataSet { + + private final Operator operator; + private final List<TSDataType> dataTypes; + + private TsBlockRowIterator tsBlockRowIterator; + + public TsBlockInputDataSet(Operator operator, List<TSDataType> dataTypes) { + this.operator = operator; + this.dataTypes = dataTypes; + } + + @Override + public List<TSDataType> getDataTypes() { + return dataTypes; + } + + @Override + public boolean hasNextRowInObjects() { + if (tsBlockRowIterator != null && tsBlockRowIterator.hasNext()) { + return true; + } + + if (!operator.hasNext()) { + return false; + } + + tsBlockRowIterator = operator.next().getTsBlockRowIterator(); + return true; + } + + @Override + public Object[] nextRowInObjects() { + return tsBlockRowIterator.next(); + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java index 29a17a1a79..261c6337d7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java @@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.openjdk.jol.info.ClassLayout; import java.util.Arrays; +import java.util.Iterator; import static io.airlift.slice.SizeOf.sizeOf; import static java.lang.String.format; @@ -173,8 +174,16 @@ public class TsBlock { return valueColumns[columnIndex]; } - public TsBlockIterator getTsBlockIterator() { - return new TsBlockIterator(0); + public TsBlockColumnIterator getTsBlockColumnIterator() { + return new TsBlockColumnIterator(0); + } + + public TsBlockColumnIterator getTsBlockColumnIterator(int columnIndex) { + return new TsBlockColumnIterator(0, columnIndex); + } + + public TsBlockRowIterator getTsBlockRowIterator() { + return new TsBlockRowIterator(0); } /** Only used for the batch data of vector time series. */ @@ -182,17 +191,24 @@ public class TsBlock { return new AlignedTsBlockIterator(0, subIndex); } - private class TsBlockIterator implements IPointReader, IBatchDataIterator { + private class TsBlockColumnIterator implements IPointReader, IBatchDataIterator { + + protected int rowIndex; + protected int columnIndex; - protected int index; + public TsBlockColumnIterator(int rowIndex) { + this.rowIndex = rowIndex; + this.columnIndex = 0; + } - public TsBlockIterator(int index) { - this.index = index; + public TsBlockColumnIterator(int rowIndex, int columnIndex) { + this.rowIndex = rowIndex; + this.columnIndex = columnIndex; } @Override public boolean hasNext() { - return index < positionCount; + return rowIndex < positionCount; } @Override @@ -202,22 +218,22 @@ public class TsBlock { @Override public void next() { - index++; + rowIndex++; } @Override public long currentTime() { - return timeColumn.getLong(index); + return timeColumn.getLong(rowIndex); } @Override public Object currentValue() { - return valueColumns[0].getTsPrimitiveType(index).getValue(); + return valueColumns[columnIndex].getTsPrimitiveType(rowIndex).getValue(); } @Override public void reset() { - index = 0; + rowIndex = 0; } @Override @@ -240,14 +256,44 @@ public class TsBlock { @Override public TimeValuePair currentTimeValuePair() { return new TimeValuePair( - timeColumn.getLong(index), valueColumns[0].getTsPrimitiveType(index)); + timeColumn.getLong(rowIndex), valueColumns[columnIndex].getTsPrimitiveType(rowIndex)); } @Override public void close() {} } - private class AlignedTsBlockIterator extends TsBlockIterator { + public class TsBlockRowIterator implements Iterator<Object[]> { + + protected int rowIndex; + protected int columnCount; + + public TsBlockRowIterator(int rowIndex) { + this.rowIndex = rowIndex; + columnCount = getValueColumnCount(); + } + + @Override + public boolean hasNext() { + return rowIndex < positionCount; + } + + @Override + public Object[] next() { + int columnCount = getValueColumnCount(); + Object[] row = new Object[columnCount + 1]; + for (int i = 0; i < columnCount; ++i) { + row[i] = valueColumns[i].getObject(rowIndex); + } + row[columnCount] = timeColumn.getObject(rowIndex); + + rowIndex++; + + return row; + } + } + + private class AlignedTsBlockIterator extends TsBlockColumnIterator { private final int subIndex; @@ -277,7 +323,7 @@ public class TsBlock { @Override public Object currentValue() { - TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(index); + TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(rowIndex); return v == null ? null : v.getValue(); } @@ -286,12 +332,12 @@ public class TsBlock { // aligned timeseries' BatchData length() may return the length of time column // we need traverse to VectorBatchDataIterator calculate the actual value column's length int cnt = 0; - int indexSave = index; + int indexSave = rowIndex; while (hasNext()) { cnt++; next(); } - index = indexSave; + rowIndex = indexSave; return cnt; } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java index 8828393a68..9d95c4edcc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java @@ -74,6 +74,11 @@ public class BinaryColumn implements Column { return values[position + arrayOffset]; } + @Override + public Object getObject(int position) { + return getBinary(position); + } + @Override public TsPrimitiveType getTsPrimitiveType(int position) { checkReadablePosition(position); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java index 54544d3650..66ef186690 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java @@ -73,6 +73,11 @@ public class BooleanColumn implements Column { return values[position + arrayOffset]; } + @Override + public Object getObject(int position) { + return getBoolean(position); + } + @Override public TsPrimitiveType getTsPrimitiveType(int position) { checkReadablePosition(position); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java index adc06a1f52..67e5d2f601 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java @@ -53,6 +53,11 @@ public interface Column { throw new UnsupportedOperationException(getClass().getName()); } + /** Gets an Object at {@code position}. */ + default Object getObject(int position) { + throw new UnsupportedOperationException(getClass().getName()); + } + /** Gets a TsPrimitiveType at {@code position}. */ default TsPrimitiveType getTsPrimitiveType(int position) { throw new UnsupportedOperationException(getClass().getName()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java index 32809b02f6..7abfa05a8e 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java @@ -73,6 +73,11 @@ public class DoubleColumn implements Column { return values[position + arrayOffset]; } + @Override + public Object getObject(int position) { + return getDouble(position); + } + @Override public TsPrimitiveType getTsPrimitiveType(int position) { checkReadablePosition(position); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java index 51a2675dae..25bbe44fdf 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java @@ -72,6 +72,11 @@ public class FloatColumn implements Column { return values[position + arrayOffset]; } + @Override + public Object getObject(int position) { + return getFloat(position); + } + @Override public TsPrimitiveType getTsPrimitiveType(int position) { checkReadablePosition(position); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java index 0d48dd2133..49d3357c1f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java @@ -72,6 +72,11 @@ public class IntColumn implements Column { return values[position + arrayOffset]; } + @Override + public Object getObject(int position) { + return getInt(position); + } + @Override public TsPrimitiveType getTsPrimitiveType(int position) { checkReadablePosition(position); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java index 345e71d5bc..e3838d77e7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java @@ -72,6 +72,11 @@ public class LongColumn implements Column { return values[position + arrayOffset]; } + @Override + public Object getObject(int position) { + return getLong(position); + } + @Override public TsPrimitiveType getTsPrimitiveType(int position) { checkReadablePosition(position); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java index 39775002dd..e60d7ebc66 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java @@ -97,6 +97,12 @@ public class RunLengthEncodedColumn implements Column { return value.getBinary(position); } + @Override + public Object getObject(int position) { + checkReadablePosition(position); + return value.getObject(position); + } + @Override public TsPrimitiveType getTsPrimitiveType(int position) { checkReadablePosition(position); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java index c46fbd2ea3..e80b670f84 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java @@ -62,6 +62,11 @@ public class TimeColumn implements Column { return values[position + arrayOffset]; } + @Override + public Object getObject(int position) { + return getLong(position); + } + @Override public boolean isNull(int position) { return false;
