Repository: carbondata Updated Branches: refs/heads/master 202d099d6 -> 9d7a9a2a9
[CARBONDATA-2637][DataMap] Fix bugs in rebuild datamap In cluster mode, readCommitScope is null while rebuilding datamap for segments, this will cause NPE. Here we use the origin segment object whose readCommitScope is not null and will work fine. This closes #2493 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9d7a9a2a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9d7a9a2a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9d7a9a2a Branch: refs/heads/master Commit: 9d7a9a2a96b11d9d12b30d13925737c8c3400ab6 Parents: 202d099 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Wed Jul 11 22:24:54 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Thu Jul 12 17:24:06 2018 +0800 ---------------------------------------------------------------------- .../datamap/IndexDataMapRebuildRDD.scala | 143 ++++++++++--------- 1 file changed, 73 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9d7a9a2a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala index 688656d..85466f1 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala @@ -275,80 +275,83 @@ class IndexDataMapRebuildRDD[K, V]( val inputMetrics = new CarbonInputMetrics TaskMetricsMap.getInstance().registerThreadCallback() val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value - val segment = inputSplit.getAllSplits.get(0).getSegment - inputMetrics.initBytesReadCallback(context, inputSplit) - - val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) - val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) - val format = createInputFormat(segment, attemptContext) - - val model = format.createQueryModel(inputSplit, attemptContext) - // one query id per table - model.setQueryId(queryId) - model.setVectorReader(false) - model.setRequiredRowId(true) - - var reader: CarbonRecordReader[Array[Object]] = null - var refresher: DataMapBuilder = null - try { - val segmentPropertiesFetcher = DataMapStoreManager.getInstance().getDataMap(carbonTable, - BlockletDataMapFactory.DATA_MAP_SCHEMA).getDataMapFactory - .asInstanceOf[SegmentPropertiesFetcher] - val segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment) - - // we use task name as shard name to create the folder for this datamap - val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath) - refresher = dataMapFactory.createBuilder(segment, shardName, segmentProperties) - refresher.initialize() - - model.setForcedDetailRawQuery(refresher.isIndexForCarbonRawBytes) - val readSupport = if (refresher.isIndexForCarbonRawBytes) { - new RawBytesReadSupport(segmentProperties, indexColumns) - } else { - new OriginalReadSupport(indexColumns.map(_.getDataType)) - } - reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics) - reader.initialize(inputSplit, attemptContext) - // skip clear datamap and we will do this adter rebuild - reader.setSkipClearDataMapAtClose(true) - - var blockletId = 0 - var firstRow = true - while (reader.nextKeyValue()) { - val rowWithPosition = reader.getCurrentValue - val size = rowWithPosition.length - val pageId = rowWithPosition(size - 2).asInstanceOf[Int] - val rowId = rowWithPosition(size - 1).asInstanceOf[Int] - - if (!firstRow && pageId == 0 && rowId == 0) { - // new blocklet started, increase blockletId - blockletId = blockletId + 1 + val segmentId = inputSplit.getAllSplits.get(0).getSegment.getSegmentNo + val segment = segments.find(p => p.getSegmentNo.equals(segmentId)) + if (segment.isDefined) { + inputMetrics.initBytesReadCallback(context, inputSplit) + + val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) + val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) + val format = createInputFormat(segment.get, attemptContext) + + val model = format.createQueryModel(inputSplit, attemptContext) + // one query id per table + model.setQueryId(queryId) + model.setVectorReader(false) + model.setRequiredRowId(true) + + var reader: CarbonRecordReader[Array[Object]] = null + var refresher: DataMapBuilder = null + try { + val segmentPropertiesFetcher = DataMapStoreManager.getInstance().getDataMap(carbonTable, + BlockletDataMapFactory.DATA_MAP_SCHEMA).getDataMapFactory + .asInstanceOf[SegmentPropertiesFetcher] + val segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment.get) + + // we use task name as shard name to create the folder for this datamap + val shardName = CarbonTablePath.getShardName(inputSplit.getAllSplits.get(0).getBlockPath) + refresher = dataMapFactory.createBuilder(segment.get, shardName, segmentProperties) + refresher.initialize() + + model.setForcedDetailRawQuery(refresher.isIndexForCarbonRawBytes) + val readSupport = if (refresher.isIndexForCarbonRawBytes) { + new RawBytesReadSupport(segmentProperties, indexColumns) } else { - firstRow = false + new OriginalReadSupport(indexColumns.map(_.getDataType)) } + reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics) + reader.initialize(inputSplit, attemptContext) + // skip clear datamap and we will do this adter rebuild + reader.setSkipClearDataMapAtClose(true) + + var blockletId = 0 + var firstRow = true + while (reader.nextKeyValue()) { + val rowWithPosition = reader.getCurrentValue + val size = rowWithPosition.length + val pageId = rowWithPosition(size - 2).asInstanceOf[Int] + val rowId = rowWithPosition(size - 1).asInstanceOf[Int] + + if (!firstRow && pageId == 0 && rowId == 0) { + // new blocklet started, increase blockletId + blockletId = blockletId + 1 + } else { + firstRow = false + } - refresher.addRow(blockletId, pageId, rowId, rowWithPosition) - } + refresher.addRow(blockletId, pageId, rowId, rowWithPosition) + } - refresher.finish() + refresher.finish() - status = true - } finally { - if (reader != null) { - try { - reader.close() - } catch { - case ex: Throwable => - LOGGER.error(ex, "Failed to close reader") + status = true + } finally { + if (reader != null) { + try { + reader.close() + } catch { + case ex: Throwable => + LOGGER.error(ex, "Failed to close reader") + } } - } - if (refresher != null) { - try { - refresher.close() - } catch { - case ex: Throwable => - LOGGER.error(ex, "Failed to close index writer") + if (refresher != null) { + try { + refresher.close() + } catch { + case ex: Throwable => + LOGGER.error(ex, "Failed to close index writer") + } } } } @@ -363,7 +366,7 @@ class IndexDataMapRebuildRDD[K, V]( override def next(): (K, V) = { finished = true - result.getKey(split.index.toString, (segment.getSegmentNo, status)) + result.getKey(split.index.toString, (segmentId, status)) } } } @@ -386,7 +389,7 @@ class IndexDataMapRebuildRDD[K, V]( CarbonInputFormat.setSegmentsToAccess( conf, - Segment.toSegmentList(Array(segment.getSegmentNo), null)) + List(segment).asJava) CarbonInputFormat.setColumnProjection( conf, @@ -408,7 +411,7 @@ class IndexDataMapRebuildRDD[K, V]( CarbonInputFormat.setSegmentsToAccess( job.getConfiguration, - Segment.toSegmentList(segments.map(_.getSegmentNo).toArray, null)) + segments.toList.asJava) CarbonInputFormat.setTableInfo( job.getConfiguration,