This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fc13b6  add filed nullable check (#881)
9fc13b6 is described below

commit 9fc13b6bc46128b5f64e74f3a59cababd456ea17
Author: Dawei Liu <[email protected]>
AuthorDate: Wed Mar 4 20:07:51 2020 +0800

    add filed nullable check (#881)
    
    * add filed nullable check
---
 .../iotdb/hadoop/tsfile/TSFRecordReader.java       | 137 +++++++++++----------
 .../watermark/GroupedLSBWatermarkEncoder.java      |   2 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  24 ++--
 3 files changed, 87 insertions(+), 76 deletions(-)

diff --git 
a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java 
b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
index 146b80b..15b6e8e 100644
--- a/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
+++ b/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFRecordReader.java
@@ -57,15 +57,15 @@ public class TSFRecordReader extends 
RecordReader<NullWritable, MapWritable> imp
    */
   private List<QueryDataSet> dataSetList = new ArrayList<>();
   /**
-   * List for name of devices. The order corresponds to the order of 
dataSetList.
-   * Means that deviceIdList[i] is the name of device for dataSetList[i].
+   * List for name of devices. The order corresponds to the order of 
dataSetList. Means that
+   * deviceIdList[i] is the name of device for dataSetList[i].
    */
   private List<String> deviceIdList = new ArrayList<>();
   private List<Field> fields = null;
   /**
    * The index of QueryDataSet that is currently processed
    */
-  private  int currentIndex = 0;
+  private int currentIndex = 0;
   private long timestamp = 0;
   private boolean isReadDeviceId = false;
   private boolean isReadTime = false;
@@ -77,67 +77,71 @@ public class TSFRecordReader extends 
RecordReader<NullWritable, MapWritable> imp
   public void initialize(InputSplit split, TaskAttemptContext context)
       throws IOException {
     if (split instanceof TSFInputSplit) {
-      initialize((TSFInputSplit) split, context.getConfiguration(), this, 
dataSetList, deviceIdList);
-    }
-    else {
+      initialize((TSFInputSplit) split, context.getConfiguration(), this, 
dataSetList,
+          deviceIdList);
+    } else {
       logger.error("The InputSplit class is not {}, the class is {}", 
TSFInputSplit.class.getName(),
-              split.getClass().getName());
+          split.getClass().getName());
       throw new InternalError(String.format("The InputSplit class is not %s, 
the class is %s",
-              TSFInputSplit.class.getName(), split.getClass().getName()));
+          TSFInputSplit.class.getName(), split.getClass().getName()));
     }
   }
 
-  public static void initialize(TSFInputSplit split, Configuration 
configuration, IReaderSet readerSet, List<QueryDataSet> dataSetList, 
List<String> deviceIdList) throws IOException  {
-      org.apache.hadoop.fs.Path path = split.getPath();
-      List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList = 
split.getChunkGroupInfoList();
-      TsFileSequenceReader reader = new TsFileSequenceReader(new 
HDFSInput(path, configuration));
-      readerSet.setReader(reader);
-      // Get the read columns and filter information
-
-      List<String> deviceIds = TSFInputFormat.getReadDeviceIds(configuration);
-      if (deviceIds == null) {
-        deviceIds = initDeviceIdList(chunkGroupInfoList);
-      }
-      List<String> measurementIds = 
TSFInputFormat.getReadMeasurementIds(configuration);
-      if (measurementIds == null) {
-        measurementIds = initSensorIdList(chunkGroupInfoList);
-      }
-      readerSet.setMeasurementIds(measurementIds);
-      logger.info("deviceIds:" + deviceIds);
-      logger.info("Sensors:" + measurementIds);
+  public static void initialize(TSFInputSplit split, Configuration 
configuration,
+      IReaderSet readerSet, List<QueryDataSet> dataSetList, List<String> 
deviceIdList)
+      throws IOException {
+    org.apache.hadoop.fs.Path path = split.getPath();
+    List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList = 
split.getChunkGroupInfoList();
+    TsFileSequenceReader reader = new TsFileSequenceReader(new HDFSInput(path, 
configuration));
+    readerSet.setReader(reader);
+    // Get the read columns and filter information
 
+    List<String> deviceIds = TSFInputFormat.getReadDeviceIds(configuration);
+    if (deviceIds == null) {
+      deviceIds = initDeviceIdList(chunkGroupInfoList);
+    }
+    List<String> measurementIds = 
TSFInputFormat.getReadMeasurementIds(configuration);
+    if (measurementIds == null) {
+      measurementIds = initSensorIdList(chunkGroupInfoList);
+    }
+    readerSet.setMeasurementIds(measurementIds);
+    logger.info("deviceIds:" + deviceIds);
+    logger.info("Sensors:" + measurementIds);
 
-      readerSet.setReadDeviceId(TSFInputFormat.getReadDeviceId(configuration));
-      readerSet.setReadTime(TSFInputFormat.getReadTime(configuration));
+    readerSet.setReadDeviceId(TSFInputFormat.getReadDeviceId(configuration));
+    readerSet.setReadTime(TSFInputFormat.getReadTime(configuration));
 
-      ReadOnlyTsFile queryEngine = new ReadOnlyTsFile(reader);
-      for (TSFInputSplit.ChunkGroupInfo chunkGroupInfo : chunkGroupInfoList) {
-        String deviceId = chunkGroupInfo.getDeviceId();
-        if (deviceIds.contains(deviceId)) {
-          List<Path> paths = measurementIds.stream()
-                  .map(measurementId -> new Path(deviceId + 
TsFileConstant.PATH_SEPARATOR + measurementId))
-                  .collect(toList());
-          QueryExpression queryExpression = QueryExpression.create(paths, 
null);
-          QueryDataSet dataSet = queryEngine.query(queryExpression,
-                  chunkGroupInfo.getStartOffset(), 
chunkGroupInfo.getEndOffset());
-          dataSetList.add(dataSet);
-          deviceIdList.add(deviceId);
-        }
+    ReadOnlyTsFile queryEngine = new ReadOnlyTsFile(reader);
+    for (TSFInputSplit.ChunkGroupInfo chunkGroupInfo : chunkGroupInfoList) {
+      String deviceId = chunkGroupInfo.getDeviceId();
+      if (deviceIds.contains(deviceId)) {
+        List<Path> paths = measurementIds.stream()
+            .map(
+                measurementId -> new Path(deviceId + 
TsFileConstant.PATH_SEPARATOR + measurementId))
+            .collect(toList());
+        QueryExpression queryExpression = QueryExpression.create(paths, null);
+        QueryDataSet dataSet = queryEngine.query(queryExpression,
+            chunkGroupInfo.getStartOffset(), chunkGroupInfo.getEndOffset());
+        dataSetList.add(dataSet);
+        deviceIdList.add(deviceId);
       }
+    }
   }
 
-  private static List<String> 
initDeviceIdList(List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList) {
+  private static List<String> initDeviceIdList(
+      List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList) {
     return chunkGroupInfoList.stream()
-            .map(TSFInputSplit.ChunkGroupInfo::getDeviceId)
-            .distinct()
-            .collect(toList());
+        .map(TSFInputSplit.ChunkGroupInfo::getDeviceId)
+        .distinct()
+        .collect(toList());
   }
 
-  private static List<String> 
initSensorIdList(List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList) {
+  private static List<String> initSensorIdList(
+      List<TSFInputSplit.ChunkGroupInfo> chunkGroupInfoList) {
     return chunkGroupInfoList.stream()
-            .flatMap(chunkGroupMetaData -> 
Arrays.stream(chunkGroupMetaData.getMeasurementIds()))
-            .distinct()
-            .collect(toList());
+        .flatMap(chunkGroupMetaData -> 
Arrays.stream(chunkGroupMetaData.getMeasurementIds()))
+        .distinct()
+        .collect(toList());
   }
 
   @Override
@@ -145,8 +149,7 @@ public class TSFRecordReader extends 
RecordReader<NullWritable, MapWritable> imp
     while (currentIndex < dataSetList.size()) {
       if (!dataSetList.get(currentIndex).hasNext()) {
         currentIndex++;
-      }
-      else {
+      } else {
         RowRecord rowRecord = dataSetList.get(currentIndex).next();
         fields = rowRecord.getFields();
         timestamp = rowRecord.getTimestamp();
@@ -163,13 +166,14 @@ public class TSFRecordReader extends 
RecordReader<NullWritable, MapWritable> imp
 
   @Override
   public MapWritable getCurrentValue() throws InterruptedException {
-    return getCurrentValue(deviceIdList, currentIndex, timestamp, isReadTime, 
isReadDeviceId, fields, measurementIds);
+    return getCurrentValue(deviceIdList, currentIndex, timestamp, isReadTime, 
isReadDeviceId,
+        fields, measurementIds);
   }
 
   public static MapWritable getCurrentValue(List<String> deviceIdList, int 
currentIndex,
-                                            long timestamp, boolean isReadTime,
-                                            boolean isReadDeviceId, 
List<Field> fields,
-                                            List<String> measurementIds)  
throws InterruptedException {
+      long timestamp, boolean isReadTime,
+      boolean isReadDeviceId, List<Field> fields,
+      List<String> measurementIds) throws InterruptedException {
     MapWritable mapWritable = new MapWritable();
     Text deviceIdText = new Text(deviceIdList.get(currentIndex));
     LongWritable time = new LongWritable(timestamp);
@@ -188,13 +192,15 @@ public class TSFRecordReader extends 
RecordReader<NullWritable, MapWritable> imp
 
   /**
    * Read from current fields value
+   *
    * @param mapWritable where to write
    * @throws InterruptedException
    */
-  public static void readFieldsValue(MapWritable mapWritable, List<Field> 
fields, List<String> measurementIds) throws InterruptedException {
+  public static void readFieldsValue(MapWritable mapWritable, List<Field> 
fields,
+      List<String> measurementIds) throws InterruptedException {
     int index = 0;
     for (Field field : fields) {
-      if (field.getDataType() == null) {
+      if (field == null || field.getDataType() == null) {
         logger.info("Current value is null");
         mapWritable.put(new Text(measurementIds.get(index)), 
NullWritable.get());
       } else {
@@ -203,24 +209,29 @@ public class TSFRecordReader extends 
RecordReader<NullWritable, MapWritable> imp
             mapWritable.put(new Text(measurementIds.get(index)), new 
IntWritable(field.getIntV()));
             break;
           case INT64:
-            mapWritable.put(new Text(measurementIds.get(index)), new 
LongWritable(field.getLongV()));
+            mapWritable
+                .put(new Text(measurementIds.get(index)), new 
LongWritable(field.getLongV()));
             break;
           case FLOAT:
-            mapWritable.put(new Text(measurementIds.get(index)), new 
FloatWritable(field.getFloatV()));
+            mapWritable
+                .put(new Text(measurementIds.get(index)), new 
FloatWritable(field.getFloatV()));
             break;
           case DOUBLE:
-            mapWritable.put(new Text(measurementIds.get(index)), new 
DoubleWritable(field.getDoubleV()));
+            mapWritable
+                .put(new Text(measurementIds.get(index)), new 
DoubleWritable(field.getDoubleV()));
             break;
           case BOOLEAN:
-            mapWritable.put(new Text(measurementIds.get(index)), new 
BooleanWritable(field.getBoolV()));
+            mapWritable
+                .put(new Text(measurementIds.get(index)), new 
BooleanWritable(field.getBoolV()));
             break;
           case TEXT:
-            mapWritable.put(new Text(measurementIds.get(index)), new 
Text(field.getBinaryV().getStringValue()));
+            mapWritable.put(new Text(measurementIds.get(index)),
+                new Text(field.getBinaryV().getStringValue()));
             break;
           default:
             logger.error("The data type is not support {}", 
field.getDataType());
             throw new InterruptedException(
-                    String.format("The data type %s is not support ", 
field.getDataType()));
+                String.format("The data type %s is not support ", 
field.getDataType()));
         }
       }
       index++;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/tools/watermark/GroupedLSBWatermarkEncoder.java
 
b/server/src/main/java/org/apache/iotdb/db/tools/watermark/GroupedLSBWatermarkEncoder.java
index 842290e..4c6d3b6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/tools/watermark/GroupedLSBWatermarkEncoder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/tools/watermark/GroupedLSBWatermarkEncoder.java
@@ -113,7 +113,7 @@ public class GroupedLSBWatermarkEncoder implements 
WatermarkEncoder {
     }
     List<Field> fields = record.getFields();
     for (Field field : fields) {
-      if (field.getDataType() == null) {
+      if (field == null || field.getDataType() == null) {
         continue;
       }
       TSDataType dataType = field.getDataType();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java 
b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 4209ba3..c5a9114 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -49,7 +49,7 @@ public class QueryDataSetUtils {
    * convert query data set by fetch size.
    *
    * @param queryDataSet -query dataset
-   * @param fetchSize -fetch size
+   * @param fetchSize    -fetch size
    * @return -convert query dataset
    */
   public static TSQueryDataSet convertQueryDataSetByFetchSize(QueryDataSet 
queryDataSet,
@@ -86,11 +86,11 @@ public class QueryDataSetUtils {
         List<Field> fields = rowRecord.getFields();
         for (int k = 0; k < fields.size(); k++) {
           Field field = fields.get(k);
-          DataOutputStream dataOutputStream = dataOutputStreams[2*k + 1]; // 
DO NOT FORGET +1
-          if (field.getDataType() == null) {
-            bitmap[k] =  (bitmap[k] << 1);
+          DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; // 
DO NOT FORGET +1
+          if (field == null || field.getDataType() == null) {
+            bitmap[k] = (bitmap[k] << 1);
           } else {
-            bitmap[k] =  (bitmap[k] << 1) | flag;
+            bitmap[k] = (bitmap[k] << 1) | flag;
             TSDataType type = field.getDataType();
             switch (type) {
               case INT32:
@@ -127,7 +127,7 @@ public class QueryDataSetUtils {
         rowCount++;
         if (rowCount % 8 == 0) {
           for (int j = 0; j < bitmap.length; j++) {
-            DataOutputStream dataBitmapOutputStream = 
dataOutputStreams[2*(j+1)];
+            DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (j 
+ 1)];
             dataBitmapOutputStream.writeByte(bitmap[j]);
             // we should clear the bitmap every 8 row record
             bitmap[j] = 0;
@@ -142,8 +142,8 @@ public class QueryDataSetUtils {
     int remaining = rowCount % 8;
     if (remaining != 0) {
       for (int j = 0; j < bitmap.length; j++) {
-        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2*(j+1)];
-        dataBitmapOutputStream.writeByte(bitmap[j] << (8-remaining));
+        DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (j + 
1)];
+        dataBitmapOutputStream.writeByte(bitmap[j] << (8 - remaining));
       }
     }
 
@@ -160,13 +160,13 @@ public class QueryDataSetUtils {
     List<ByteBuffer> bitmapList = new LinkedList<>();
     List<ByteBuffer> valueList = new LinkedList<>();
     for (int i = 1; i < byteArrayOutputStreams.length; i += 2) {
-      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i-1)/2]);
+      ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 
2]);
       valueBuffer.put(byteArrayOutputStreams[i].toByteArray());
       valueBuffer.flip();
       valueList.add(valueBuffer);
 
       ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation);
-      bitmapBuffer.put(byteArrayOutputStreams[i+1].toByteArray());
+      bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray());
       bitmapBuffer.flip();
       bitmapList.add(bitmapBuffer);
     }
@@ -194,9 +194,9 @@ public class QueryDataSetUtils {
   }
 
   /**
-   * @param buffer data values
+   * @param buffer  data values
    * @param columns column number
-   * @param size value count in each column
+   * @param size    value count in each column
    */
   public static Object[] readValuesFromBuffer(ByteBuffer buffer, TSDataType[] 
types,
       int columns, int size) {

Reply via email to