Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149067739
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
---
@@ -82,8 +84,43 @@ class CarbonScanRDD(
// get splits
val splits = format.getSplits(job)
- val result = distributeSplits(splits)
- result
+
+ // separate split
+ // 1. for batch splits, invoke distributeSplits method to create
partitions
+ // 2. for stream splits, create partition for each split by default
+ val columnarSplits = new ArrayList[InputSplit]()
+ val streamSplits = new ArrayBuffer[InputSplit]()
+ for(i <- 0 until splits.size()) {
+ val carbonInputSplit = splits.get(i).asInstanceOf[CarbonInputSplit]
+ if ("row-format".equals(carbonInputSplit.getFormat)) {
+ streamSplits += splits.get(i)
+ } else {
+ columnarSplits.add(splits.get(i))
+ }
+ }
+ val batchPartitions = distributeSplits(columnarSplits)
+ if (streamSplits.isEmpty) {
+ batchPartitions
+ } else {
+ val index = batchPartitions.length
+ val streamPartitions: ArrayBuffer[Partition] =
+ streamSplits.zipWithIndex.map { splitWithIndex =>
+ val multiBlockSplit =
+ new CarbonMultiBlockSplit(identifier,
+ Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+ splitWithIndex._1.getLocations)
+ multiBlockSplit.setStream(true)
--- End diff --
I think you can set the same DATA_FILE_FORMAT enum in `multiBlockSplit`
---