This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9fc13b6 add filed nullable check (#881)
9fc13b6 is described below
commit 9fc13b6bc46128b5f64e74f3a59cababd456ea17
Author: Dawei Liu <[email protected]>
AuthorDate: Wed Mar 4 20:07:51 2020 +0800
add filed nullable check (#881)
* add filed nullable check
---
.../iotdb/hadoop/tsfile/TSFRecordReader.java | 137 +++++++++++----------
.../watermark/GroupedLSBWatermarkEncoder.java | 2 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 24 ++--
3 files changed, 87 insertions(+), 76 deletions(-)
diff --git
a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
index 146b80b..15b6e8e 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
@@ -57,15 +57,15 @@ public class TSFRecordReader extends
RecordReader<NullWritable, MapWritable> imp
*/
private List<QueryDataSet> dataSetList = new ArrayList<>();
/**
- * List for name of devices. The order corresponds to the order of
dataSetList.
- * Means that deviceIdList[i] is the name of device for dataSetList[i].
+ * List for name of devices. The order corresponds to the order of
dataSetList. Means that
+ * deviceIdList[i] is the name of device for dataSetList[i].
*/
private List<String> deviceIdList = new ArrayList<>();
private List<Field> fields = null;
/**
* The index of QueryDataSet that is currently processed
*/
- private int currentIndex = 0;
+ private int currentIndex = 0;
private long timestamp = 0;
private boolean isReadDeviceId = false;
private boolean isReadTime = false;
@@ -77,67 +77,71 @@ public class TSFRecordReader extends
RecordReader<NullWritable, MapWritable> imp
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException {
if (split instanceof TSFInputSplit) {
- initialize((TSFInputSplit) split, context.getConfiguration(), this,
dataSetList, deviceIdList);
- }
- else {
+ initialize((TSFInputSplit) split, context.getConfiguration(), this,
dataSetList,
+ deviceIdList);
+ } else {
logger.error("The InputSplit class is not {}, the class is {}",
TSFInputSplit.class.getName(),
- split.getClass().getName());
+ split.getClass().getName());
throw new InternalError(String.format("The InputSplit class is not %s,
the class is %s",
- TSFInputSplit.class.getName(), split.getClass().getName()));
+ TSFInputSplit.class.getName(), split.getClass().getName()));
}
}
- public static void initialize(TSFInputSplit split, Configuration
configuration, IReaderSet readerSet, List<QueryDataSet> dataSetList,
List<String> deviceIdList) throws IOException {
- org.apache.hadoop.fs.Path path = split.getPath();
- List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList =
split.getChunkGroupInfoList();
- TsFileSequenceReader reader = new TsFileSequenceReader(new
HDFSInput(path, configuration));
- readerSet.setReader(reader);
- // Get the read columns and filter information
-
- List<String> deviceIds = TSFInputFormat.getReadDeviceIds(configuration);
- if (deviceIds == null) {
- deviceIds = initDeviceIdList(chunkGroupInfoList);
- }
- List<String> measurementIds =
TSFInputFormat.getReadMeasurementIds(configuration);
- if (measurementIds == null) {
- measurementIds = initSensorIdList(chunkGroupInfoList);
- }
- readerSet.setMeasurementIds(measurementIds);
- logger.info("deviceIds:" + deviceIds);
- logger.info("Sensors:" + measurementIds);
+ public static void initialize(TSFInputSplit split, Configuration
configuration,
+ IReaderSet readerSet, List<QueryDataSet> dataSetList, List<String>
deviceIdList)
+ throws IOException {
+ org.apache.hadoop.fs.Path path = split.getPath();
+ List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList =
split.getChunkGroupInfoList();
+ TsFileSequenceReader reader = new TsFileSequenceReader(new HDFSInput(path,
configuration));
+ readerSet.setReader(reader);
+ // Get the read columns and filter information
+ List<String> deviceIds = TSFInputFormat.getReadDeviceIds(configuration);
+ if (deviceIds == null) {
+ deviceIds = initDeviceIdList(chunkGroupInfoList);
+ }
+ List<String> measurementIds =
TSFInputFormat.getReadMeasurementIds(configuration);
+ if (measurementIds == null) {
+ measurementIds = initSensorIdList(chunkGroupInfoList);
+ }
+ readerSet.setMeasurementIds(measurementIds);
+ logger.info("deviceIds:" + deviceIds);
+ logger.info("Sensors:" + measurementIds);
- readerSet.setReadDeviceId(TSFInputFormat.getReadDeviceId(configuration));
- readerSet.setReadTime(TSFInputFormat.getReadTime(configuration));
+ readerSet.setReadDeviceId(TSFInputFormat.getReadDeviceId(configuration));
+ readerSet.setReadTime(TSFInputFormat.getReadTime(configuration));
- ReadOnlyTsFile queryEngine = new ReadOnlyTsFile(reader);
- for (TSFInputSplit.ChunkGroupInfo chunkGroupInfo : chunkGroupInfoList) {
- String deviceId = chunkGroupInfo.getDeviceId();
- if (deviceIds.contains(deviceId)) {
- List<Path> paths = measurementIds.stream()
- .map(measurementId -> new Path(deviceId +
TsFileConstant.PATH_SEPARATOR + measurementId))
- .collect(toList());
- QueryExpression queryExpression = QueryExpression.create(paths,
null);
- QueryDataSet dataSet = queryEngine.query(queryExpression,
- chunkGroupInfo.getStartOffset(),
chunkGroupInfo.getEndOffset());
- dataSetList.add(dataSet);
- deviceIdList.add(deviceId);
- }
+ ReadOnlyTsFile queryEngine = new ReadOnlyTsFile(reader);
+ for (TSFInputSplit.ChunkGroupInfo chunkGroupInfo : chunkGroupInfoList) {
+ String deviceId = chunkGroupInfo.getDeviceId();
+ if (deviceIds.contains(deviceId)) {
+ List<Path> paths = measurementIds.stream()
+ .map(
+ measurementId -> new Path(deviceId +
TsFileConstant.PATH_SEPARATOR + measurementId))
+ .collect(toList());
+ QueryExpression queryExpression = QueryExpression.create(paths, null);
+ QueryDataSet dataSet = queryEngine.query(queryExpression,
+ chunkGroupInfo.getStartOffset(), chunkGroupInfo.getEndOffset());
+ dataSetList.add(dataSet);
+ deviceIdList.add(deviceId);
}
+ }
}
- private static List<String>
initDeviceIdList(List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList) {
+ private static List<String> initDeviceIdList(
+ List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList) {
return chunkGroupInfoList.stream()
- .map(TSFInputSplit.ChunkGroupInfo::getDeviceId)
- .distinct()
- .collect(toList());
+ .map(TSFInputSplit.ChunkGroupInfo::getDeviceId)
+ .distinct()
+ .collect(toList());
}
- private static List<String>
initSensorIdList(List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList) {
+ private static List<String> initSensorIdList(
+ List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList) {
return chunkGroupInfoList.stream()
- .flatMap(chunkGroupMetaData ->
Arrays.stream(chunkGroupMetaData.getMeasurementIds()))
- .distinct()
- .collect(toList());
+ .flatMap(chunkGroupMetaData ->
Arrays.stream(chunkGroupMetaData.getMeasurementIds()))
+ .distinct()
+ .collect(toList());
}
@Override
@@ -145,8 +149,7 @@ public class TSFRecordReader extends
RecordReader<NullWritable, MapWritable> imp
while (currentIndex < dataSetList.size()) {
if (!dataSetList.get(currentIndex).hasNext()) {
currentIndex++;
- }
- else {
+ } else {
RowRecord rowRecord = dataSetList.get(currentIndex).next();
fields = rowRecord.getFields();
timestamp = rowRecord.getTimestamp();
@@ -163,13 +166,14 @@ public class TSFRecordReader extends
RecordReader<NullWritable, MapWritable> imp
@Override
public MapWritable getCurrentValue() throws InterruptedException {
- return getCurrentValue(deviceIdList, currentIndex, timestamp, isReadTime,
isReadDeviceId, fields, measurementIds);
+ return getCurrentValue(deviceIdList, currentIndex, timestamp, isReadTime,
isReadDeviceId,
+ fields, measurementIds);
}
public static MapWritable getCurrentValue(List<String> deviceIdList, int
currentIndex,
- long timestamp, boolean isReadTime,
- boolean isReadDeviceId,
List<Field> fields,
- List<String> measurementIds)
throws InterruptedException {
+ long timestamp, boolean isReadTime,
+ boolean isReadDeviceId, List<Field> fields,
+ List<String> measurementIds) throws InterruptedException {
MapWritable mapWritable = new MapWritable();
Text deviceIdText = new Text(deviceIdList.get(currentIndex));
LongWritable time = new LongWritable(timestamp);
@@ -188,13 +192,15 @@ public class TSFRecordReader extends
RecordReader<NullWritable, MapWritable> imp
/**
* Read from current fields value
+ *
* @param mapWritable where to write
* @throws InterruptedException
*/
- public static void readFieldsValue(MapWritable mapWritable, List<Field>
fields, List<String> measurementIds) throws InterruptedException {
+ public static void readFieldsValue(MapWritable mapWritable, List<Field>
fields,
+ List<String> measurementIds) throws InterruptedException {
int index = 0;
for (Field field : fields) {
- if (field.getDataType() == null) {
+ if (field == null || field.getDataType() == null) {
logger.info("Current value is null");
mapWritable.put(new Text(measurementIds.get(index)),
NullWritable.get());
} else {
@@ -203,24 +209,29 @@ public class TSFRecordReader extends
RecordReader<NullWritable, MapWritable> imp
mapWritable.put(new Text(measurementIds.get(index)), new
IntWritable(field.getIntV()));
break;
case INT64:
- mapWritable.put(new Text(measurementIds.get(index)), new
LongWritable(field.getLongV()));
+ mapWritable
+ .put(new Text(measurementIds.get(index)), new
LongWritable(field.getLongV()));
break;
case FLOAT:
- mapWritable.put(new Text(measurementIds.get(index)), new
FloatWritable(field.getFloatV()));
+ mapWritable
+ .put(new Text(measurementIds.get(index)), new
FloatWritable(field.getFloatV()));
break;
case DOUBLE:
- mapWritable.put(new Text(measurementIds.get(index)), new
DoubleWritable(field.getDoubleV()));
+ mapWritable
+ .put(new Text(measurementIds.get(index)), new
DoubleWritable(field.getDoubleV()));
break;
case BOOLEAN:
- mapWritable.put(new Text(measurementIds.get(index)), new
BooleanWritable(field.getBoolV()));
+ mapWritable
+ .put(new Text(measurementIds.get(index)), new
BooleanWritable(field.getBoolV()));
break;
case TEXT:
- mapWritable.put(new Text(measurementIds.get(index)), new
Text(field.getBinaryV().getStringValue()));
+ mapWritable.put(new Text(measurementIds.get(index)),
+ new Text(field.getBinaryV().getStringValue()));
break;
default:
logger.error("The data type is not support {}",
field.getDataType());
throw new InterruptedException(
- String.format("The data type %s is not support ",
field.getDataType()));
+ String.format("The data type %s is not support ",
field.getDataType()));
}
}
index++;
diff --git
a/server/src/main/java/org/apache/iotdb/db/tools/watermark/GroupedLSBWatermarkEncoder.java
b/server/src/main/java/org/apache/iotdb/db/tools/watermark/GroupedLSBWatermarkEncoder.java
index 842290e..4c6d3b6 100644
---
a/server/src/main/java/org/apache/iotdb/db/tools/watermark/GroupedLSBWatermarkEncoder.java
+++
b/server/src/main/java/org/apache/iotdb/db/tools/watermark/GroupedLSBWatermarkEncoder.java
@@ -113,7 +113,7 @@ public class GroupedLSBWatermarkEncoder implements
WatermarkEncoder {
}
List<Field> fields = record.getFields();
for (Field field : fields) {
- if (field.getDataType() == null) {
+ if (field == null || field.getDataType() == null) {
continue;
}
TSDataType dataType = field.getDataType();
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 4209ba3..c5a9114 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -49,7 +49,7 @@ public class QueryDataSetUtils {
* convert query data set by fetch size.
*
* @param queryDataSet -query dataset
- * @param fetchSize -fetch size
+ * @param fetchSize -fetch size
* @return -convert query dataset
*/
public static TSQueryDataSet convertQueryDataSetByFetchSize(QueryDataSet
queryDataSet,
@@ -86,11 +86,11 @@ public class QueryDataSetUtils {
List<Field> fields = rowRecord.getFields();
for (int k = 0; k < fields.size(); k++) {
Field field = fields.get(k);
- DataOutputStream dataOutputStream = dataOutputStreams[2*k + 1]; //
DO NOT FORGET +1
- if (field.getDataType() == null) {
- bitmap[k] = (bitmap[k] << 1);
+ DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; //
DO NOT FORGET +1
+ if (field == null || field.getDataType() == null) {
+ bitmap[k] = (bitmap[k] << 1);
} else {
- bitmap[k] = (bitmap[k] << 1) | flag;
+ bitmap[k] = (bitmap[k] << 1) | flag;
TSDataType type = field.getDataType();
switch (type) {
case INT32:
@@ -127,7 +127,7 @@ public class QueryDataSetUtils {
rowCount++;
if (rowCount % 8 == 0) {
for (int j = 0; j < bitmap.length; j++) {
- DataOutputStream dataBitmapOutputStream =
dataOutputStreams[2*(j+1)];
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (j
+ 1)];
dataBitmapOutputStream.writeByte(bitmap[j]);
// we should clear the bitmap every 8 row record
bitmap[j] = 0;
@@ -142,8 +142,8 @@ public class QueryDataSetUtils {
int remaining = rowCount % 8;
if (remaining != 0) {
for (int j = 0; j < bitmap.length; j++) {
- DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
- dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+ DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (j +
1)];
+ dataBitmapOutputStream.writeByte(bitmap[j] << (8 - remaining));
}
}
@@ -160,13 +160,13 @@ public class QueryDataSetUtils {
List<ByteBuffer> bitmapList = new LinkedList<>();
List<ByteBuffer> valueList = new LinkedList<>();
for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
- ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i-1)/2]);
+ ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) /
2]);
valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
valueBuffer.flip();
valueList.add(valueBuffer);
ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
- bitmapBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+ bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
bitmapBuffer.flip();
bitmapList.add(bitmapBuffer);
}
@@ -194,9 +194,9 @@ public class QueryDataSetUtils {
}
/**
- * @param buffer data values
+ * @param buffer data values
* @param columns column number
- * @param size value count in each column
+ * @param size value count in each column
*/
public static Object[] readValuesFromBuffer(ByteBuffer buffer, TSDataType[]
types,
int columns, int size) {