Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2443#discussion_r200965779
--- Diff:
integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
---
@@ -163,38 +268,51 @@ class IndexDataMapRebuildRDD[K, V](
override def internalCompute(split: Partition, context: TaskContext):
Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val dataMapFactory =
- DataMapManager.get().getDataMapProvider(
- CarbonTable.buildFromTableInfo(getTableInfo), dataMapSchema,
session).getDataMapFactory
+ val carbonTable = CarbonTable.buildFromTableInfo(getTableInfo)
+ val dataMapFactory = DataMapManager.get().getDataMapProvider(
+ carbonTable, dataMapSchema, session).getDataMapFactory
var status = false
val inputMetrics = new CarbonInputMetrics
TaskMetricsMap.getInstance().registerThreadCallback()
val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+ val segment = inputSplit.getAllSplits.get(0).getSegment
inputMetrics.initBytesReadCallback(context, inputSplit)
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP,
split.index, 0)
val attemptContext = new TaskAttemptContextImpl(new Configuration(),
attemptId)
- val format = createInputFormat(attemptContext)
+ val format = createInputFormat(segment, attemptContext)
val model = format.createQueryModel(inputSplit, attemptContext)
// one query id per table
model.setQueryId(queryId)
model.setVectorReader(false)
- model.setForcedDetailRawQuery(false)
model.setRequiredRowId(true)
var reader: CarbonRecordReader[Array[Object]] = null
var refresher: DataMapBuilder = null
try {
- reader = new CarbonRecordReader(
- model, new OriginalReadSupport(indexColumns.map(_.getDataType)),
inputMetrics)
- reader.initialize(inputSplit, attemptContext)
+ val segmentPropertiesFetcher =
DataMapStoreManager.getInstance().getDataMap(carbonTable,
--- End diff --
fixed
---