This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch TYQuery
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TYQuery by this push:
new 1797834 [To Vector] Modify RawQueryDataSetWithoutValueFilter to fit
for Vector type (#2853)
1797834 is described below
commit 17978348c2a0cb4b36768d0916f01eaf842b7fdf
Author: wshao08 <[email protected]>
AuthorDate: Tue Mar 16 14:10:32 2021 +0800
[To Vector] Modify RawQueryDataSetWithoutValueFilter to fit for Vector type
(#2853)
* Modify RawQueryDataSetWithoutValueFilter to fit for Vector type
* Fix bug
---
.../dataset/RawQueryDataSetWithoutValueFilter.java | 110 +++++++++++++++++----
1 file changed, 92 insertions(+), 18 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
index 63f08c1..b4150bb 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithoutValueFilter.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.query.control.QueryTimeManager;
import org.apache.iotdb.db.query.pool.QueryTaskPoolManager;
import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
@@ -37,6 +38,7 @@ import
org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -213,16 +215,24 @@ public class RawQueryDataSetWithoutValueFilter extends
QueryDataSet
TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
PublicBAOS timeBAOS = new PublicBAOS();
- PublicBAOS[] valueBAOSList = new PublicBAOS[seriesNum];
- PublicBAOS[] bitmapBAOSList = new PublicBAOS[seriesNum];
+ int bufferNum = 0;
+ for (int index = 0; index < seriesNum; index++) {
+ if (paths.get(index) instanceof VectorPartialPath) {
+ bufferNum += ((VectorPartialPath)
paths).getSubSensorsPathList().size();
+ } else {
+ bufferNum += 1;
+ }
+ }
+ PublicBAOS[] valueBAOSList = new PublicBAOS[bufferNum];
+ PublicBAOS[] bitmapBAOSList = new PublicBAOS[bufferNum];
- for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
- valueBAOSList[seriesIndex] = new PublicBAOS();
- bitmapBAOSList[seriesIndex] = new PublicBAOS();
+ for (int bufferIndex = 0; bufferIndex < bufferNum; bufferIndex++) {
+ valueBAOSList[bufferIndex] = new PublicBAOS();
+ bitmapBAOSList[bufferIndex] = new PublicBAOS();
}
// used to record a bitmap for every 8 row records
- int[] currentBitmapList = new int[seriesNum];
+ int[] currentBitmapList = new int[bufferNum];
int rowCount = 0;
while (rowCount < fetchSize) {
@@ -236,56 +246,120 @@ public class RawQueryDataSetWithoutValueFilter extends
QueryDataSet
timeBAOS.write(BytesUtils.longToBytes(minTime));
}
- for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+ for (int seriesIndex = 0, bufferIndex = 0; seriesIndex < seriesNum;
seriesIndex++) {
if (cachedBatchDataArray[seriesIndex] == null
|| !cachedBatchDataArray[seriesIndex].hasCurrent()
|| cachedBatchDataArray[seriesIndex].currentTime() != minTime) {
// current batch is empty or does not have value at minTime
if (rowOffset == 0) {
- currentBitmapList[seriesIndex] = (currentBitmapList[seriesIndex]
<< 1);
+ if (paths.get(seriesIndex) instanceof VectorPartialPath) {
+ for (int i = 0; i < ((VectorPartialPath)
paths).getSubSensorsPathList().size(); i++) {
+ currentBitmapList[bufferIndex] =
(currentBitmapList[bufferIndex] << 1);
+ bufferIndex++;
+ }
+ } else {
+ currentBitmapList[bufferIndex] = (currentBitmapList[bufferIndex]
<< 1);
+ bufferIndex++;
+ }
}
} else {
// current batch has value at minTime, consume current value
if (rowOffset == 0) {
- currentBitmapList[seriesIndex] = (currentBitmapList[seriesIndex]
<< 1) | FLAG;
TSDataType type = cachedBatchDataArray[seriesIndex].getDataType();
switch (type) {
case INT32:
+ currentBitmapList[bufferIndex] =
(currentBitmapList[bufferIndex] << 1) | FLAG;
int intValue = cachedBatchDataArray[seriesIndex].getInt();
if (encoder != null && encoder.needEncode(minTime)) {
intValue = encoder.encodeInt(intValue, minTime);
}
ReadWriteIOUtils.write(intValue, valueBAOSList[seriesIndex]);
+ bufferIndex++;
break;
case INT64:
+ currentBitmapList[bufferIndex] =
(currentBitmapList[bufferIndex] << 1) | FLAG;
long longValue = cachedBatchDataArray[seriesIndex].getLong();
if (encoder != null && encoder.needEncode(minTime)) {
longValue = encoder.encodeLong(longValue, minTime);
}
ReadWriteIOUtils.write(longValue, valueBAOSList[seriesIndex]);
+ bufferIndex++;
break;
case FLOAT:
+ currentBitmapList[bufferIndex] =
(currentBitmapList[bufferIndex] << 1) | FLAG;
float floatValue =
cachedBatchDataArray[seriesIndex].getFloat();
if (encoder != null && encoder.needEncode(minTime)) {
floatValue = encoder.encodeFloat(floatValue, minTime);
}
ReadWriteIOUtils.write(floatValue, valueBAOSList[seriesIndex]);
+ bufferIndex++;
break;
case DOUBLE:
+ currentBitmapList[bufferIndex] =
(currentBitmapList[bufferIndex] << 1) | FLAG;
double doubleValue =
cachedBatchDataArray[seriesIndex].getDouble();
if (encoder != null && encoder.needEncode(minTime)) {
doubleValue = encoder.encodeDouble(doubleValue, minTime);
}
ReadWriteIOUtils.write(doubleValue,
valueBAOSList[seriesIndex]);
+ bufferIndex++;
break;
case BOOLEAN:
+ currentBitmapList[bufferIndex] =
(currentBitmapList[bufferIndex] << 1) | FLAG;
ReadWriteIOUtils.write(
cachedBatchDataArray[seriesIndex].getBoolean(),
valueBAOSList[seriesIndex]);
+ bufferIndex++;
break;
case TEXT:
+ currentBitmapList[bufferIndex] =
(currentBitmapList[bufferIndex] << 1) | FLAG;
ReadWriteIOUtils.write(
cachedBatchDataArray[seriesIndex].getBinary(),
valueBAOSList[seriesIndex]);
+ bufferIndex++;
+ break;
+ case VECTOR:
+ for (TsPrimitiveType primitiveVal :
cachedBatchDataArray[seriesIndex].getVector()) {
+ currentBitmapList[bufferIndex] =
(currentBitmapList[bufferIndex] << 1) | FLAG;
+ switch (primitiveVal.getDataType()) {
+ case INT32:
+ int intVal = primitiveVal.getInt();
+ if (encoder != null && encoder.needEncode(minTime)) {
+ intVal = encoder.encodeInt(intVal, minTime);
+ }
+ ReadWriteIOUtils.write(intVal,
valueBAOSList[bufferIndex]);
+ break;
+ case INT64:
+ long longVal = primitiveVal.getLong();
+ if (encoder != null && encoder.needEncode(minTime)) {
+ longVal = encoder.encodeLong(longVal, minTime);
+ }
+ ReadWriteIOUtils.write(longVal,
valueBAOSList[bufferIndex]);
+ break;
+ case FLOAT:
+ float floatVal = primitiveVal.getFloat();
+ if (encoder != null && encoder.needEncode(minTime)) {
+ floatVal = encoder.encodeFloat(floatVal, minTime);
+ }
+ ReadWriteIOUtils.write(floatVal,
valueBAOSList[bufferIndex]);
+ break;
+ case DOUBLE:
+ double doubleVal = primitiveVal.getDouble();
+ if (encoder != null && encoder.needEncode(minTime)) {
+ doubleVal = encoder.encodeDouble(doubleVal, minTime);
+ }
+ ReadWriteIOUtils.write(doubleVal,
valueBAOSList[bufferIndex]);
+ break;
+ case BOOLEAN:
+ ReadWriteIOUtils.write(primitiveVal.getBoolean(),
valueBAOSList[bufferIndex]);
+ break;
+ case TEXT:
+ ReadWriteIOUtils.write(primitiveVal.getBinary(),
valueBAOSList[bufferIndex]);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.",
type));
+ }
+ bufferIndex++;
+ }
break;
default:
throw new UnSupportedDataTypeException(
@@ -315,11 +389,11 @@ public class RawQueryDataSetWithoutValueFilter extends
QueryDataSet
if (rowOffset == 0) {
rowCount++;
if (rowCount % 8 == 0) {
- for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+ for (int bufferIndex = 0; bufferIndex < bufferNum; bufferIndex++) {
ReadWriteIOUtils.write(
- (byte) currentBitmapList[seriesIndex],
bitmapBAOSList[seriesIndex]);
+ (byte) currentBitmapList[bufferIndex],
bitmapBAOSList[bufferIndex]);
// we should clear the bitmap every 8 row record
- currentBitmapList[seriesIndex] = 0;
+ currentBitmapList[bufferIndex] = 0;
}
}
if (rowLimit > 0) {
@@ -337,10 +411,10 @@ public class RawQueryDataSetWithoutValueFilter extends
QueryDataSet
if (rowCount > 0) {
int remaining = rowCount % 8;
if (remaining != 0) {
- for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+ for (int bufferIndex = 0; bufferIndex < bufferNum; bufferIndex++) {
ReadWriteIOUtils.write(
- (byte) (currentBitmapList[seriesIndex] << (8 - remaining)),
- bitmapBAOSList[seriesIndex]);
+ (byte) (currentBitmapList[bufferIndex] << (8 - remaining)),
+ bitmapBAOSList[bufferIndex]);
}
}
}
@@ -354,13 +428,13 @@ public class RawQueryDataSetWithoutValueFilter extends
QueryDataSet
List<ByteBuffer> valueBufferList = new ArrayList<>();
List<ByteBuffer> bitmapBufferList = new ArrayList<>();
- for (int seriesIndex = 0; seriesIndex < seriesNum; seriesIndex++) {
+ for (int bufferIndex = 0; bufferIndex < bufferNum; bufferIndex++) {
// add value buffer of current series
- putPBOSToBuffer(valueBAOSList, valueBufferList, seriesIndex);
+ putPBOSToBuffer(valueBAOSList, valueBufferList, bufferIndex);
// add bitmap buffer of current series
- putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, seriesIndex);
+ putPBOSToBuffer(bitmapBAOSList, bitmapBufferList, bufferIndex);
}
// set value buffers and bitmap buffers