This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-1971 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit afb1d5f33ce9c401e5c08248a6bdf33e9b925125 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Dec 1 18:04:56 2021 +0800 move packBuffer and putPBOSToBuffer into QueryDataSetUtils --- .../query/dataset/udf/UDTFAlignByTimeDataSet.java | 40 +----- .../db/query/dataset/udf/UDTFJoinDataSet.java | 140 +++++++++++++++++++-- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 33 +++++ 3 files changed, 168 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java index 7572516..7c7f1bb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader; import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; +import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; @@ -40,8 +41,6 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignByTimeDataSet { @@ -109,8 +108,6 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy } public QueryDataSet executeInFragmentsIfPossible() throws QueryProcessException, IOException { - // TODO make the behaviour of the return value of layerBuilder.generateJoinDataSet() the same as - // TODO the original dataset return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet(this) : this; } @@ -237,39 +234,8 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy } } - return packBuffer(tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList); - } - - protected TSQueryDataSet packBuffer( - TSQueryDataSet tsQueryDataSet, - PublicBAOS timeBAOS, - PublicBAOS[] valueBAOSList, - PublicBAOS[] bitmapBAOSList) { - int columnsNum = transformers.length; - - ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size()); - timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size()); - timeBuffer.flip(); - tsQueryDataSet.setTime(timeBuffer); - - List<ByteBuffer> valueBufferList = new ArrayList<>(); - List<ByteBuffer> bitmapBufferList = new ArrayList<>(); - for (int i = 0; i < columnsNum; ++i) { - putPBOSToBuffer(valueBAOSList, valueBufferList, i); - putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, i); - } - tsQueryDataSet.setValueList(valueBufferList); - tsQueryDataSet.setBitmapList(bitmapBufferList); - - return tsQueryDataSet; - } - - protected void putPBOSToBuffer( - PublicBAOS[] bitmapBAOSList, List<ByteBuffer> bitmapBufferList, int tsIndex) { - ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapBAOSList[tsIndex].size()); - bitmapBuffer.put(bitmapBAOSList[tsIndex].getBuf(), 0, bitmapBAOSList[tsIndex].size()); - bitmapBuffer.flip(); - bitmapBufferList.add(bitmapBuffer); + return QueryDataSetUtils.packBuffer( + tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, transformers.length); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java index f92f600..1e7bd02 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java @@ -19,15 +19,19 @@ package org.apache.iotdb.db.query.dataset.udf; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; +import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; +import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.db.utils.datastructure.TimeSelector; +import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.utils.PublicBAOS; import java.io.IOException; -public class UDTFJoinDataSet extends QueryDataSet -// implements DirectAlignByTimeDataSet -{ +public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet { private final UDTFFragmentDataSet[] fragmentDataSets; @@ -77,6 +81,131 @@ public class UDTFJoinDataSet extends QueryDataSet } @Override + public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) + throws IOException, QueryProcessException { + TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); + + PublicBAOS timeBAOS = new PublicBAOS(); + PublicBAOS[] valueBAOSList = new PublicBAOS[resultColumnsLength]; + PublicBAOS[] bitmapBAOSList = new PublicBAOS[resultColumnsLength]; + for (int i = 0; i < resultColumnsLength; ++i) { + valueBAOSList[i] = new PublicBAOS(); + bitmapBAOSList[i] = new PublicBAOS(); + } + int[] currentBitmapList = new int[resultColumnsLength]; + + // int rowCount = 0; + // while (rowCount < fetchSize + // && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit) + // && !timeHeap.isEmpty()) { + // + // long minTime = timeHeap.pollFirst(); + // if (rowOffset == 0) { + // timeBAOS.write(BytesUtils.longToBytes(minTime)); + // } + // + // for (int i = 0; i < resultColumnsLength; ++i) { + // LayerPointReader reader = transformers[i]; + // + // if (!reader.next() || reader.currentTime() != minTime) { + // if (rowOffset == 0) { + // currentBitmapList[i] = (currentBitmapList[i] << 1); + // } + // continue; + // } + // + // if (rowOffset == 0) { + // currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG; + // TSDataType type = reader.getDataType(); + // switch (type) { + // case INT32: + // int intValue = reader.currentInt(); + // ReadWriteIOUtils.write( + // encoder != null && encoder.needEncode(minTime) + // ? encoder.encodeInt(intValue, minTime) + // : intValue, + // valueBAOSList[i]); + // break; + // case INT64: + // long longValue = reader.currentLong(); + // ReadWriteIOUtils.write( + // encoder != null && encoder.needEncode(minTime) + // ? encoder.encodeLong(longValue, minTime) + // : longValue, + // valueBAOSList[i]); + // break; + // case FLOAT: + // float floatValue = reader.currentFloat(); + // ReadWriteIOUtils.write( + // encoder != null && encoder.needEncode(minTime) + // ? encoder.encodeFloat(floatValue, minTime) + // : floatValue, + // valueBAOSList[i]); + // break; + // case DOUBLE: + // double doubleValue = reader.currentDouble(); + // ReadWriteIOUtils.write( + // encoder != null && encoder.needEncode(minTime) + // ? encoder.encodeDouble(doubleValue, minTime) + // : doubleValue, + // valueBAOSList[i]); + // break; + // case BOOLEAN: + // ReadWriteIOUtils.write(reader.currentBoolean(), valueBAOSList[i]); + // break; + // case TEXT: + // ReadWriteIOUtils.write(reader.currentBinary(), valueBAOSList[i]); + // break; + // default: + // throw new UnSupportedDataTypeException( + // String.format("Data type %s is not supported.", type)); + // } + // } + // + // reader.readyForNext(); + // + // if (reader.next()) { + // timeHeap.add(reader.currentTime()); + // } + // } + // + // if (rowOffset == 0) { + // ++rowCount; + // if (rowCount % 8 == 0) { + // for (int i = 0; i < resultColumnsLength; ++i) { + // ReadWriteIOUtils.write((byte) currentBitmapList[i], bitmapBAOSList[i]); + // currentBitmapList[i] = 0; + // } + // } + // if (rowLimit > 0) { + // ++alreadyReturnedRowNum; + // } + // } else { + // --rowOffset; + // } + // + // rawQueryInputLayer.updateRowRecordListEvictionUpperBound(); + // } + // + // /* + // * feed the bitmap with remaining 0 in the right + // * if current bitmap is 00011111 and remaining is 3, after feeding the bitmap is 11111000 + // */ + // if (rowCount > 0) { + // int remaining = rowCount % 8; + // if (remaining != 0) { + // for (int i = 0; i < resultColumnsLength; ++i) { + // ReadWriteIOUtils.write( + // (byte) (currentBitmapList[i] << (8 - remaining)), bitmapBAOSList[i]); + // } + // } + // } + + return QueryDataSetUtils.packBuffer( + tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, resultColumnsLength); + } + + @Override public boolean hasNextWithoutConstraint() throws IOException { return !timeHeap.isEmpty(); } @@ -114,9 +243,4 @@ public class UDTFJoinDataSet extends QueryDataSet return rowRecord; } - - // @Override - // public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) { - // throw new NotImplementedException(); - // } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 34c1483..7816169 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -28,11 +28,13 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.utils.PublicBAOS; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -269,4 +271,35 @@ public class QueryDataSetUtils { } return values; } + + public static TSQueryDataSet packBuffer( + TSQueryDataSet tsQueryDataSet, + PublicBAOS timeBAOS, + PublicBAOS[] valueBAOSList, + PublicBAOS[] bitmapBAOSList, + int columnsLength) { + ByteBuffer timeBuffer = ByteBuffer.allocate(timeBAOS.size()); + timeBuffer.put(timeBAOS.getBuf(), 0, timeBAOS.size()); + timeBuffer.flip(); + tsQueryDataSet.setTime(timeBuffer); + + List<ByteBuffer> valueBufferList = new ArrayList<>(); + List<ByteBuffer> bitmapBufferList = new ArrayList<>(); + for (int i = 0; i < columnsLength; ++i) { + putPBOSToBuffer(valueBAOSList, valueBufferList, i); + putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, i); + } + tsQueryDataSet.setValueList(valueBufferList); + tsQueryDataSet.setBitmapList(bitmapBufferList); + + return tsQueryDataSet; + } + + public static void putPBOSToBuffer( + PublicBAOS[] bitmapBAOSList, List<ByteBuffer> bitmapBufferList, int tsIndex) { + ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapBAOSList[tsIndex].size()); + bitmapBuffer.put(bitmapBAOSList[tsIndex].getBuf(), 0, bitmapBAOSList[tsIndex].size()); + bitmapBuffer.flip(); + bitmapBufferList.add(bitmapBuffer); + } }
