[CARBONDATA-2637][DataMap] Fix bugs in rebuild datamap

In cluster mode, readCommitScope is null while rebuilding datamap for
segments, this will cause NPE. Here we use the origin segment object
whose readCommitScope is not null and will work fine.

This closes #2493


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9d7a9a2a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9d7a9a2a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9d7a9a2a

Branch: refs/heads/carbonstore
Commit: 9d7a9a2a96b11d9d12b30d13925737c8c3400ab6
Parents: 202d099
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Wed Jul 11 22:24:54 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Thu Jul 12 17:24:06 2018 +0800

----------------------------------------------------------------------
 .../datamap/IndexDataMapRebuildRDD.scala        | 143 ++++++++++---------
 1 file changed, 73 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d7a9a2a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 688656d..85466f1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -275,80 +275,83 @@ class IndexDataMapRebuildRDD[K, V](
     val inputMetrics = new CarbonInputMetrics
     TaskMetricsMap.getInstance().registerThreadCallback()
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-    val segment = inputSplit.getAllSplits.get(0).getSegment
-    inputMetrics.initBytesReadCallback(context, inputSplit)
-
-    val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
-    val attemptContext = new TaskAttemptContextImpl(new Configuration(), 
attemptId)
-    val format = createInputFormat(segment, attemptContext)
-
-    val model = format.createQueryModel(inputSplit, attemptContext)
-    // one query id per table
-    model.setQueryId(queryId)
-    model.setVectorReader(false)
-    model.setRequiredRowId(true)
-
-    var reader: CarbonRecordReader[Array[Object]] = null
-    var refresher: DataMapBuilder = null
-    try {
-      val segmentPropertiesFetcher = 
DataMapStoreManager.getInstance().getDataMap(carbonTable,
-        BlockletDataMapFactory.DATA_MAP_SCHEMA).getDataMapFactory
-        .asInstanceOf[SegmentPropertiesFetcher]
-      val segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segment)
-
-      // we use task name as shard name to create the folder for this datamap
-      val shardName = 
CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath)
-      refresher = dataMapFactory.createBuilder(segment, shardName, 
segmentProperties)
-      refresher.initialize()
-
-      model.setForcedDetailRawQuery(refresher.isIndexForCarbonRawBytes)
-      val readSupport = if (refresher.isIndexForCarbonRawBytes) {
-        new RawBytesReadSupport(segmentProperties, indexColumns)
-      } else {
-        new OriginalReadSupport(indexColumns.map(_.getDataType))
-      }
-      reader = new CarbonRecordReader[Array[Object]](model, readSupport, 
inputMetrics)
-      reader.initialize(inputSplit, attemptContext)
-      // skip clear datamap and we will do this adter rebuild
-      reader.setSkipClearDataMapAtClose(true)
-
-      var blockletId = 0
-      var firstRow = true
-      while (reader.nextKeyValue()) {
-        val rowWithPosition = reader.getCurrentValue
-        val size = rowWithPosition.length
-        val pageId = rowWithPosition(size - 2).asInstanceOf[Int]
-        val rowId = rowWithPosition(size - 1).asInstanceOf[Int]
-
-        if (!firstRow && pageId == 0 && rowId == 0) {
-          // new blocklet started, increase blockletId
-          blockletId = blockletId + 1
+    val segmentId = inputSplit.getAllSplits.get(0).getSegment.getSegmentNo
+    val segment = segments.find(p => p.getSegmentNo.equals(segmentId))
+    if (segment.isDefined) {
+      inputMetrics.initBytesReadCallback(context, inputSplit)
+
+      val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
+      val attemptContext = new TaskAttemptContextImpl(new Configuration(), 
attemptId)
+      val format = createInputFormat(segment.get, attemptContext)
+
+      val model = format.createQueryModel(inputSplit, attemptContext)
+      // one query id per table
+      model.setQueryId(queryId)
+      model.setVectorReader(false)
+      model.setRequiredRowId(true)
+
+      var reader: CarbonRecordReader[Array[Object]] = null
+      var refresher: DataMapBuilder = null
+      try {
+        val segmentPropertiesFetcher = 
DataMapStoreManager.getInstance().getDataMap(carbonTable,
+          BlockletDataMapFactory.DATA_MAP_SCHEMA).getDataMapFactory
+          .asInstanceOf[SegmentPropertiesFetcher]
+        val segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segment.get)
+
+        // we use task name as shard name to create the folder for this datamap
+        val shardName = 
CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath)
+        refresher = dataMapFactory.createBuilder(segment.get, shardName, 
segmentProperties)
+        refresher.initialize()
+
+        model.setForcedDetailRawQuery(refresher.isIndexForCarbonRawBytes)
+        val readSupport = if (refresher.isIndexForCarbonRawBytes) {
+          new RawBytesReadSupport(segmentProperties, indexColumns)
         } else {
-          firstRow = false
+          new OriginalReadSupport(indexColumns.map(_.getDataType))
         }
+        reader = new CarbonRecordReader[Array[Object]](model, readSupport, 
inputMetrics)
+        reader.initialize(inputSplit, attemptContext)
+        // skip clear datamap and we will do this adter rebuild
+        reader.setSkipClearDataMapAtClose(true)
+
+        var blockletId = 0
+        var firstRow = true
+        while (reader.nextKeyValue()) {
+          val rowWithPosition = reader.getCurrentValue
+          val size = rowWithPosition.length
+          val pageId = rowWithPosition(size - 2).asInstanceOf[Int]
+          val rowId = rowWithPosition(size - 1).asInstanceOf[Int]
+
+          if (!firstRow && pageId == 0 && rowId == 0) {
+            // new blocklet started, increase blockletId
+            blockletId = blockletId + 1
+          } else {
+            firstRow = false
+          }
 
-        refresher.addRow(blockletId, pageId, rowId, rowWithPosition)
-      }
+          refresher.addRow(blockletId, pageId, rowId, rowWithPosition)
+        }
 
-      refresher.finish()
+        refresher.finish()
 
-      status = true
-    } finally {
-      if (reader != null) {
-        try {
-          reader.close()
-        } catch {
-          case ex: Throwable =>
-            LOGGER.error(ex, "Failed to close reader")
+        status = true
+      } finally {
+        if (reader != null) {
+          try {
+            reader.close()
+          } catch {
+            case ex: Throwable =>
+              LOGGER.error(ex, "Failed to close reader")
+          }
         }
-      }
 
-      if (refresher != null) {
-        try {
-          refresher.close()
-        } catch {
-          case ex: Throwable =>
-            LOGGER.error(ex, "Failed to close index writer")
+        if (refresher != null) {
+          try {
+            refresher.close()
+          } catch {
+            case ex: Throwable =>
+              LOGGER.error(ex, "Failed to close index writer")
+          }
         }
       }
     }
@@ -363,7 +366,7 @@ class IndexDataMapRebuildRDD[K, V](
 
       override def next(): (K, V) = {
         finished = true
-        result.getKey(split.index.toString, (segment.getSegmentNo, status))
+        result.getKey(split.index.toString, (segmentId, status))
       }
     }
   }
@@ -386,7 +389,7 @@ class IndexDataMapRebuildRDD[K, V](
 
     CarbonInputFormat.setSegmentsToAccess(
       conf,
-      Segment.toSegmentList(Array(segment.getSegmentNo), null))
+      List(segment).asJava)
 
     CarbonInputFormat.setColumnProjection(
       conf,
@@ -408,7 +411,7 @@ class IndexDataMapRebuildRDD[K, V](
 
     CarbonInputFormat.setSegmentsToAccess(
       job.getConfiguration,
-      Segment.toSegmentList(segments.map(_.getSegmentNo).toArray, null))
+      segments.toList.asJava)
 
     CarbonInputFormat.setTableInfo(
       job.getConfiguration,

Reply via email to