Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1470#discussion_r149068014
--- Diff:
integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
---
@@ -210,8 +247,18 @@ class CarbonScanRDD(
inputMetricsStats.initBytesReadCallback(context, inputSplit)
val iterator = if (inputSplit.getAllSplits.size() > 0) {
val model = format.getQueryModel(inputSplit, attemptContext)
- val reader = {
- if (vectorReader) {
+ val reader: RecordReader[Void, Object] = {
+ if (inputSplit.isStream) {
+ DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+ val inputFormat = new CarbonStreamInputFormat
+ val streamReader = inputFormat.createRecordReader(inputSplit,
attemptContext)
+ .asInstanceOf[CarbonStreamRecordReader]
+ streamReader.setVectorReader(vectorReader)
+ model.setStatisticsRecorder(
+
CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))
+ streamReader.setQueryModel(model)
+ streamReader
+ } else if (vectorReader) {
--- End diff --
It is better to use else instead of else if, so that in the else block it
handles for columnar format
---