This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/sonar by this push:
new 3ae34524940
server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java done
3ae34524940 is described below
commit 3ae34524940a16e82c27d546b881571ee2972919
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jun 20 12:31:56 2023 +0800
server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java done
---
.../db/mpp/plan/parser/StatementGenerator.java | 6 +-
.../planner/plan/node/write/InsertTabletNode.java | 9 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 546 ++++++++++++++-------
3 files changed, 377 insertions(+), 184 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 5fc4f3a2348..10d60e2f9ed 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -319,7 +319,8 @@ public class StatementGenerator {
insertTabletReq.size));
insertStatement.setBitMaps(
QueryDataSetUtils.readBitMapsFromBuffer(
- insertTabletReq.values, insertTabletReq.types.size(),
insertTabletReq.size));
+ insertTabletReq.values, insertTabletReq.types.size(),
insertTabletReq.size)
+ .orElse(null));
insertStatement.setRowCount(insertTabletReq.size);
TSDataType[] dataTypes = new TSDataType[insertTabletReq.types.size()];
for (int i = 0; i < insertTabletReq.types.size(); i++) {
@@ -351,7 +352,8 @@ public class StatementGenerator {
req.sizeList.get(i)));
insertTabletStatement.setBitMaps(
QueryDataSetUtils.readBitMapsFromBuffer(
- req.valuesList.get(i), req.measurementsList.get(i).size(),
req.sizeList.get(i)));
+ req.valuesList.get(i), req.measurementsList.get(i).size(),
req.sizeList.get(i))
+ .orElse(null));
insertTabletStatement.setRowCount(req.sizeList.get(i));
TSDataType[] dataTypes = new TSDataType[req.typesList.get(i).size()];
for (int j = 0; j < dataTypes.length; j++) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 2501c535a0c..d3131de1c60 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -657,7 +657,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
if (hasBitMaps) {
- bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer,
measurementSize, rowCount);
+ bitMaps =
+ QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize,
rowCount).orElse(null);
}
columns =
QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes,
measurementSize, rowCount);
@@ -888,7 +889,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
boolean hasBitMaps = BytesUtils.byteToBool(stream.readByte());
if (hasBitMaps) {
- bitMaps = QueryDataSetUtils.readBitMapsFromStream(stream,
measurementSize, rowCount);
+ bitMaps =
+ QueryDataSetUtils.readBitMapsFromStream(stream, measurementSize,
rowCount).orElse(null);
}
columns =
QueryDataSetUtils.readTabletValuesFromStream(stream, dataTypes,
measurementSize, rowCount);
@@ -927,7 +929,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
if (hasBitMaps) {
- bitMaps = QueryDataSetUtils.readBitMapsFromBuffer(buffer,
measurementSize, rowCount);
+ bitMaps =
+ QueryDataSetUtils.readBitMapsFromBuffer(buffer, measurementSize,
rowCount).orElse(null);
}
columns =
QueryDataSetUtils.readTabletValuesFromBuffer(buffer, dataTypes,
measurementSize, rowCount);
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 6baee727383..9e21484d7ab 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.utils;
import org.apache.iotdb.commons.exception.IoTDBException;
@@ -51,7 +52,6 @@ public class QueryDataSetUtils {
IQueryExecution queryExecution, int fetchSize) throws IOException,
IoTDBException {
boolean finished = false;
int columnNum = queryExecution.getOutputValueColumnCount();
- TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
// one time column and each value column has an actual value buffer and a
bitmap value to
// indicate whether it is a null
int columnNumWithTime = columnNum * 2 + 1;
@@ -74,139 +74,283 @@ public class QueryDataSetUtils {
break;
}
TsBlock tsBlock = optionalTsBlock.get();
- if (tsBlock.isEmpty()) {
- continue;
+ if (!tsBlock.isEmpty()) {
+ int currentCount = tsBlock.getPositionCount();
+ serializeTsBlock(
+ rowCount,
+ currentCount,
+ tsBlock,
+ columnNum,
+ dataOutputStreams,
+ valueOccupation,
+ bitmaps);
+ rowCount += currentCount;
+ }
+ }
+
+ fillRemainingBitMap(rowCount, columnNum, dataOutputStreams, bitmaps);
+
+ TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
+
+ fillTimeColumn(rowCount, byteArrayOutputStreams, tsQueryDataSet);
+
+ fillValueColumnsAndBitMaps(rowCount, byteArrayOutputStreams,
valueOccupation, tsQueryDataSet);
+
+ return new Pair<>(tsQueryDataSet, finished);
+ }
+
+ private static void serializeTsBlock(
+ int rowCount,
+ int currentCount,
+ TsBlock tsBlock,
+ int columnNum,
+ DataOutputStream[] dataOutputStreams,
+ int[] valueOccupation,
+ int[] bitmaps)
+ throws IOException {
+ // serialize time column
+ for (int i = 0; i < currentCount; i++) {
+ // use columnOutput to write byte array
+ dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+ }
+
+ // serialize each value column and its bitmap
+ for (int k = 0; k < columnNum; k++) {
+ // get DataOutputStream for current value column and its bitmap
+ DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)];
+
+ Column column = tsBlock.getColumn(k);
+ TSDataType type = column.getDataType();
+ switch (type) {
+ case INT32:
+ doWithInt32Column(
+ rowCount,
+ column,
+ bitmaps,
+ k,
+ dataOutputStream,
+ valueOccupation,
+ dataBitmapOutputStream);
+ break;
+ case INT64:
+ doWithInt64Column(
+ rowCount,
+ column,
+ bitmaps,
+ k,
+ dataOutputStream,
+ valueOccupation,
+ dataBitmapOutputStream);
+ break;
+ case FLOAT:
+ doWithFloatColumn(
+ rowCount,
+ column,
+ bitmaps,
+ k,
+ dataOutputStream,
+ valueOccupation,
+ dataBitmapOutputStream);
+ break;
+ case DOUBLE:
+ doWithDoubleColumn(
+ rowCount,
+ column,
+ bitmaps,
+ k,
+ dataOutputStream,
+ valueOccupation,
+ dataBitmapOutputStream);
+ break;
+ case BOOLEAN:
+ doWithBooleanColumn(
+ rowCount,
+ column,
+ bitmaps,
+ k,
+ dataOutputStream,
+ valueOccupation,
+ dataBitmapOutputStream);
+ break;
+ case TEXT:
+ doWithTextColumn(
+ rowCount,
+ column,
+ bitmaps,
+ k,
+ dataOutputStream,
+ valueOccupation,
+ dataBitmapOutputStream);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", type));
}
+ }
+ }
- int currentCount = tsBlock.getPositionCount();
- // serialize time column
- for (int i = 0; i < currentCount; i++) {
- // use columnOutput to write byte array
- dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i));
+ private static void doWithInt32Column(
+ int rowCount,
+ Column column,
+ int[] bitmaps,
+ int columnIndex,
+ DataOutputStream dataOutputStream,
+ int[] valueOccupation,
+ DataOutputStream dataBitmapOutputStream)
+ throws IOException {
+ for (int i = 0, size = column.getPositionCount(); i < size; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
+ } else {
+ bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
+ dataOutputStream.writeInt(column.getInt(i));
+ valueOccupation[columnIndex] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
+ // we should clear the bitmap every 8 points
+ bitmaps[columnIndex] = 0;
}
+ }
+ }
- // serialize each value column and its bitmap
- for (int k = 0; k < columnNum; k++) {
- // get DataOutputStream for current value column and its bitmap
- DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1];
- DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k +
1)];
+ private static void doWithInt64Column(
+ int rowCount,
+ Column column,
+ int[] bitmaps,
+ int columnIndex,
+ DataOutputStream dataOutputStream,
+ int[] valueOccupation,
+ DataOutputStream dataBitmapOutputStream)
+ throws IOException {
+ for (int i = 0, size = column.getPositionCount(); i < size; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
+ } else {
+ bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
+ dataOutputStream.writeLong(column.getLong(i));
+ valueOccupation[columnIndex] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
+ // we should clear the bitmap every 8 points
+ bitmaps[columnIndex] = 0;
+ }
+ }
+ }
- Column column = tsBlock.getColumn(k);
- TSDataType type = column.getDataType();
- switch (type) {
- case INT32:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeInt(column.getInt(i));
- valueOccupation[k] += 4;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case INT64:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeLong(column.getLong(i));
- valueOccupation[k] += 8;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case FLOAT:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeFloat(column.getFloat(i));
- valueOccupation[k] += 4;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case DOUBLE:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeDouble(column.getDouble(i));
- valueOccupation[k] += 8;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case BOOLEAN:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- dataOutputStream.writeBoolean(column.getBoolean(i));
- valueOccupation[k] += 1;
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- case TEXT:
- for (int i = 0; i < currentCount; i++) {
- rowCount++;
- if (column.isNull(i)) {
- bitmaps[k] = bitmaps[k] << 1;
- } else {
- bitmaps[k] = (bitmaps[k] << 1) | FLAG;
- Binary binary = column.getBinary(i);
- dataOutputStream.writeInt(binary.getLength());
- dataOutputStream.write(binary.getValues());
- valueOccupation[k] = valueOccupation[k] + 4 +
binary.getLength();
- }
- if (rowCount != 0 && rowCount % 8 == 0) {
- dataBitmapOutputStream.writeByte(bitmaps[k]);
- // we should clear the bitmap every 8 points
- bitmaps[k] = 0;
- }
- }
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", type));
- }
- if (k != columnNum - 1) {
- rowCount -= currentCount;
- }
+ private static void doWithFloatColumn(
+ int rowCount,
+ Column column,
+ int[] bitmaps,
+ int columnIndex,
+ DataOutputStream dataOutputStream,
+ int[] valueOccupation,
+ DataOutputStream dataBitmapOutputStream)
+ throws IOException {
+ for (int i = 0, size = column.getPositionCount(); i < size; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
+ } else {
+ bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
+ dataOutputStream.writeFloat(column.getFloat(i));
+ valueOccupation[columnIndex] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
+ // we should clear the bitmap every 8 points
+ bitmaps[columnIndex] = 0;
}
}
+ }
+
+ private static void doWithDoubleColumn(
+ int rowCount,
+ Column column,
+ int[] bitmaps,
+ int columnIndex,
+ DataOutputStream dataOutputStream,
+ int[] valueOccupation,
+ DataOutputStream dataBitmapOutputStream)
+ throws IOException {
+ for (int i = 0, size = column.getPositionCount(); i < size; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
+ } else {
+ bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
+ dataOutputStream.writeDouble(column.getDouble(i));
+ valueOccupation[columnIndex] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
+ // we should clear the bitmap every 8 points
+ bitmaps[columnIndex] = 0;
+ }
+ }
+ }
+
+ private static void doWithBooleanColumn(
+ int rowCount,
+ Column column,
+ int[] bitmaps,
+ int columnIndex,
+ DataOutputStream dataOutputStream,
+ int[] valueOccupation,
+ DataOutputStream dataBitmapOutputStream)
+ throws IOException {
+ for (int i = 0, size = column.getPositionCount(); i < size; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
+ } else {
+ bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
+ dataOutputStream.writeBoolean(column.getBoolean(i));
+ valueOccupation[columnIndex] += 4;
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
+ // we should clear the bitmap every 8 points
+ bitmaps[columnIndex] = 0;
+ }
+ }
+ }
+
+ private static void doWithTextColumn(
+ int rowCount,
+ Column column,
+ int[] bitmaps,
+ int columnIndex,
+ DataOutputStream dataOutputStream,
+ int[] valueOccupation,
+ DataOutputStream dataBitmapOutputStream)
+ throws IOException {
+ for (int i = 0, size = column.getPositionCount(); i < size; i++) {
+ rowCount++;
+ if (column.isNull(i)) {
+ bitmaps[columnIndex] = bitmaps[columnIndex] << 1;
+ } else {
+ bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG;
+ Binary binary = column.getBinary(i);
+ dataOutputStream.writeInt(binary.getLength());
+ dataOutputStream.write(binary.getValues());
+ valueOccupation[columnIndex] = valueOccupation[columnIndex] + 4 +
binary.getLength();
+ }
+ if (rowCount != 0 && rowCount % 8 == 0) {
+ dataBitmapOutputStream.writeByte(bitmaps[columnIndex]);
+ // we should clear the bitmap every 8 points
+ bitmaps[columnIndex] = 0;
+ }
+ }
+ }
+
+ private static void fillRemainingBitMap(
+ int rowCount, int columnNum, DataOutputStream[] dataOutputStreams, int[]
bitmaps)
+ throws IOException {
// feed the remaining bitmap
int remaining = rowCount % 8;
for (int k = 0; k < columnNum; k++) {
@@ -215,14 +359,23 @@ public class QueryDataSetUtils {
dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining));
}
}
+ }
+ private static void fillTimeColumn(
+ int rowCount, ByteArrayOutputStream[] byteArrayOutputStreams,
TSQueryDataSet tsQueryDataSet) {
// calculate the time buffer size
int timeOccupation = rowCount * 8;
ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation);
timeBuffer.put(byteArrayOutputStreams[0].toByteArray());
timeBuffer.flip();
tsQueryDataSet.setTime(timeBuffer);
+ }
+ private static void fillValueColumnsAndBitMaps(
+ int rowCount,
+ ByteArrayOutputStream[] byteArrayOutputStreams,
+ int[] valueOccupation,
+ TSQueryDataSet tsQueryDataSet) {
// calculate the bitmap buffer size
int bitmapOccupation = (rowCount + 7) / 8;
@@ -241,11 +394,17 @@ public class QueryDataSetUtils {
}
tsQueryDataSet.setBitmapList(bitmapList);
tsQueryDataSet.setValueList(valueList);
- return new Pair<>(tsQueryDataSet, finished);
}
- /** pair.left is serialized TsBlock pair.right indicates if the query
finished */
- // To fetch required amounts of data and combine them through List
+ /**
+ * To fetch required amounts of data and combine them through List
+ *
+ * @param queryExecution used to get TsBlock from and judge whether there is
more data.
+ * @param fetchSize wanted row size
+ * @return pair.left is serialized TsBlock pair.right indicates if the query
finished
+ * @throws IoTDBException IoTDBException may be thrown if error happened
while getting TsBlock
+ * from IQueryExecution
+ */
public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize(
IQueryExecution queryExecution, int fetchSize) throws IoTDBException {
int rowCount = 0;
@@ -287,9 +446,9 @@ public class QueryDataSetUtils {
return times;
}
- public static BitMap[] readBitMapsFromBuffer(ByteBuffer buffer, int columns,
int size) {
+ public static Optional<BitMap[]> readBitMapsFromBuffer(ByteBuffer buffer,
int columns, int size) {
if (!buffer.hasRemaining()) {
- return null;
+ return Optional.empty();
}
BitMap[] bitMaps = new BitMap[columns];
for (int i = 0; i < columns; i++) {
@@ -302,13 +461,13 @@ public class QueryDataSetUtils {
bitMaps[i] = new BitMap(size, bytes);
}
}
- return bitMaps;
+ return Optional.of(bitMaps);
}
- public static BitMap[] readBitMapsFromStream(DataInputStream stream, int
columns, int size)
- throws IOException {
+ public static Optional<BitMap[]> readBitMapsFromStream(
+ DataInputStream stream, int columns, int size) throws IOException {
if (stream.available() <= 0) {
- return null;
+ return Optional.empty();
}
BitMap[] bitMaps = new BitMap[columns];
for (int i = 0; i < columns; i++) {
@@ -321,7 +480,7 @@ public class QueryDataSetUtils {
bitMaps[i] = new BitMap(size, bytes);
}
}
- return bitMaps;
+ return Optional.of(bitMaps);
}
public static Object[] readTabletValuesFromBuffer(
@@ -333,19 +492,14 @@ public class QueryDataSetUtils {
return readTabletValuesFromBuffer(buffer, dataTypes, columns, size);
}
- public static Object[] readTabletValuesFromStream(
- DataInputStream stream, List<Integer> types, int columns, int size)
throws IOException {
- TSDataType[] dataTypes = new TSDataType[types.size()];
- for (int i = 0; i < dataTypes.length; i++) {
- dataTypes[i] = TSDataType.values()[types.get(i)];
- }
- return readTabletValuesFromStream(stream, dataTypes, columns, size);
- }
-
/**
+ * Deserialize Tablet Values From Buffer
+ *
* @param buffer data values
* @param columns column number
* @param size value count in each column
+ * @throws UnSupportedDataTypeException if TSDataType is unknown,
UnSupportedDataTypeException
+ * will be thrown.
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public static Object[] readTabletValuesFromBuffer(
@@ -412,49 +566,22 @@ public class QueryDataSetUtils {
for (int i = 0; i < columns; i++) {
switch (types[i]) {
case BOOLEAN:
- boolean[] boolValues = new boolean[size];
- for (int index = 0; index < size; index++) {
- boolValues[index] = BytesUtils.byteToBool(stream.readByte());
- }
- values[i] = boolValues;
+ parseBooleanColumn(size, stream, values, i);
break;
case INT32:
- int[] intValues = new int[size];
- for (int index = 0; index < size; index++) {
- intValues[index] = stream.readInt();
- }
- values[i] = intValues;
+ parseInt32Column(size, stream, values, i);
break;
case INT64:
- long[] longValues = new long[size];
- for (int index = 0; index < size; index++) {
- longValues[index] = stream.readLong();
- }
- values[i] = longValues;
+ parseInt64Column(size, stream, values, i);
break;
case FLOAT:
- float[] floatValues = new float[size];
- for (int index = 0; index < size; index++) {
- floatValues[index] = stream.readFloat();
- }
- values[i] = floatValues;
+ parseFloatColumn(size, stream, values, i);
break;
case DOUBLE:
- double[] doubleValues = new double[size];
- for (int index = 0; index < size; index++) {
- doubleValues[index] = stream.readDouble();
- }
- values[i] = doubleValues;
+ parseDoubleColumn(size, stream, values, i);
break;
case TEXT:
- Binary[] binaryValues = new Binary[size];
- for (int index = 0; index < size; index++) {
- int binarySize = stream.readInt();
- byte[] binaryValue = new byte[binarySize];
- stream.read(binaryValue);
- binaryValues[index] = new Binary(binaryValue);
- }
- values[i] = binaryValues;
+ parseTextColumn(size, stream, values, i);
break;
default:
throw new UnSupportedDataTypeException(
@@ -463,4 +590,65 @@ public class QueryDataSetUtils {
}
return values;
}
+
+ private static void parseBooleanColumn(
+ int size, DataInputStream stream, Object[] values, int columnIndex)
throws IOException {
+ boolean[] boolValues = new boolean[size];
+ for (int index = 0; index < size; index++) {
+ boolValues[index] = BytesUtils.byteToBool(stream.readByte());
+ }
+ values[columnIndex] = boolValues;
+ }
+
+ private static void parseInt32Column(
+ int size, DataInputStream stream, Object[] values, int columnIndex)
throws IOException {
+ int[] intValues = new int[size];
+ for (int index = 0; index < size; index++) {
+ intValues[index] = stream.readInt();
+ }
+ values[columnIndex] = intValues;
+ }
+
+ private static void parseInt64Column(
+ int size, DataInputStream stream, Object[] values, int columnIndex)
throws IOException {
+ long[] longValues = new long[size];
+ for (int index = 0; index < size; index++) {
+ longValues[index] = stream.readLong();
+ }
+ values[columnIndex] = longValues;
+ }
+
+ private static void parseFloatColumn(
+ int size, DataInputStream stream, Object[] values, int columnIndex)
throws IOException {
+ float[] floatValues = new float[size];
+ for (int index = 0; index < size; index++) {
+ floatValues[index] = stream.readFloat();
+ }
+ values[columnIndex] = floatValues;
+ }
+
+ private static void parseDoubleColumn(
+ int size, DataInputStream stream, Object[] values, int columnIndex)
throws IOException {
+ double[] doubleValues = new double[size];
+ for (int index = 0; index < size; index++) {
+ doubleValues[index] = stream.readDouble();
+ }
+ values[columnIndex] = doubleValues;
+ }
+
+ private static void parseTextColumn(
+ int size, DataInputStream stream, Object[] values, int columnIndex)
throws IOException {
+ Binary[] binaryValues = new Binary[size];
+ for (int index = 0; index < size; index++) {
+ int binarySize = stream.readInt();
+ byte[] binaryValue = new byte[binarySize];
+ int actualReadSize = stream.read(binaryValue);
+ if (actualReadSize != binarySize) {
+ throw new IllegalStateException(
+ "Expect to read " + binarySize + " bytes, actually read " +
actualReadSize + "bytes.");
+ }
+ binaryValues[index] = new Binary(binaryValue);
+ }
+ values[columnIndex] = binaryValues;
+ }
}