This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 00f516b [CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception
in load from CSV flow with binary non-sort columns
00f516b is described below
commit 00f516bbfaa58efa5f2e7914cfe01bddb0bcc812
Author: ajantha-bhat <[email protected]>
AuthorDate: Sat Dec 28 16:54:06 2019 +0800
[CARBONDATA-3639][CARBONDATA-3638] Fix global sort exception in load from
CSV flow with binary non-sort columns
Problem:
Global sort throws exception in load from CSV flow with binary non-sort
columns.
Exception is attached in JIRA and test case is added in PR.
Cause: For global sort flow, we make csvRDD from file. But again this
String RDD is converted to new string scala objects, here binary was exceeding
32000 length. Hence the exception
Solution: For CSV load flow, avoid this new String object conversion as it
is already string object.
This can also handle [CARBONDATA-3638]
This closes #3540
---
.../testsuite/binary/TestBinaryDataType.scala | 42 ++++++++++++++++++++++
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 25 ++++++++++---
.../spark/load/DataLoadProcessorStepOnSpark.scala | 29 +++++++++++++++
3 files changed, 91 insertions(+), 5 deletions(-)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 7550581..84675fc 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -112,6 +112,48 @@ class TestBinaryDataType extends QueryTest with
BeforeAndAfterAll {
}
}
+ test("Create table and load data with binary column with other global sort
columns") {
+ sql("DROP TABLE IF EXISTS binaryTable")
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS binaryTable (
+ | id int,
+ | label boolean,
+ | name string,
+ | binaryField binary,
+ | autoLabel boolean)
+ | STORED AS CARBONDATA
+ | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE' =
'global_sort')
+ """.stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$resourcesPath/binaryDataHex.csv'
+ | INTO TABLE binaryTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
+ try {
+ val df = sql("SELECT * FROM binaryTable").collect()
+ assert(3 == df.length)
+ df.foreach { each =>
+ assert(5 == each.length)
+ assert(Integer.valueOf(each(0).toString) > 0)
+ assert(each(1).toString.equalsIgnoreCase("false") ||
(each(1).toString.equalsIgnoreCase("true")))
+ assert(each(2).toString.contains(".png"))
+ val bytes40 = each.getAs[Array[Byte]](3).slice(0, 40)
+ val binaryName = each(2).toString
+ val expectedBytes =
Hex.encodeHex(firstBytes20.get(binaryName).get)
+ assert(Arrays.equals(String.valueOf(expectedBytes).getBytes(),
bytes40), "incorrect value for binary data")
+ assert(each(4).toString.equalsIgnoreCase("false") ||
(each(4).toString.equalsIgnoreCase("true")))
+ }
+ } catch {
+ case e: Exception =>
+ e.printStackTrace()
+ assert(false)
+ }
+ }
+
private val firstBytes20 = Map("1.png" -> Array[Byte](-119, 80, 78, 71,
13, 10, 26, 10, 0, 0, 0, 13, 73, 72, 68, 82, 0, 0, 1, 74),
"2.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,
73, 72, 68, 82, 0, 0, 2, -11),
"3.png" -> Array[Byte](-119, 80, 78, 71, 13, 10, 26, 10, 0, 0, 0, 13,
73, 72, 68, 82, 0, 0, 1, 54)
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index bb5e946..ae859c0 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -52,7 +52,7 @@ import
org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator,
NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil,
TableOptionConstant}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
import org.apache.carbondata.spark.util.CommonUtil
import org.apache.carbondata.store.CarbonRowReadSupport
@@ -67,10 +67,12 @@ object DataLoadProcessBuilderOnSpark {
dataFrame: Option[DataFrame],
model: CarbonLoadModel,
hadoopConf: Configuration): Array[(String, (LoadMetadataDetails,
ExecutionErrors))] = {
+ var isLoadFromCSV = false
val originRDD = if (dataFrame.isDefined) {
dataFrame.get.rdd
} else {
// input data from files
+ isLoadFromCSV = true
val columnCount = model.getCsvHeaderColumns.length
CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf)
.map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
@@ -90,11 +92,24 @@ object DataLoadProcessBuilderOnSpark {
val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
// 1. Input
- val inputRDD = originRDD
- .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows,
modelBroadcast))
- .mapPartitionsWithIndex { case (index, rows) =>
- DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast,
inputStepRowCounter)
+ val inputRDD = if (isLoadFromCSV) {
+ // No need of wrap with NewRDDIterator, which converts object to string,
+ // as it is already a string.
+ // So, this will avoid new object creation in case of CSV global sort
load for each row
+ originRDD.mapPartitionsWithIndex { case (index, rows) =>
+ DataLoadProcessorStepOnSpark.inputFuncForCsvRows(
+ rows.asInstanceOf[Iterator[StringArrayRow]],
+ index,
+ modelBroadcast,
+ inputStepRowCounter)
}
+ } else {
+ originRDD
+ .mapPartitions(rows =>
DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
+ .mapPartitionsWithIndex { case (index, rows) =>
+ DataLoadProcessorStepOnSpark.inputFunc(rows, index, modelBroadcast,
inputStepRowCounter)
+ }
+ }
// 2. Convert
val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index ff0e1bf..041019a 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -97,6 +97,35 @@ object DataLoadProcessorStepOnSpark {
}
}
+ def inputFuncForCsvRows(
+ rows: Iterator[StringArrayRow],
+ index: Int,
+ modelBroadcast: Broadcast[CarbonLoadModel],
+ rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+ val model: CarbonLoadModel =
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+ val conf = DataLoadProcessBuilder.createConfiguration(model)
+ val rowParser = new RowParserImpl(conf.getDataFields, conf)
+ val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+ TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable)
=>
+ wrapException(e, model)
+ }
+
+ new Iterator[CarbonRow] {
+ override def hasNext: Boolean = rows.hasNext
+
+ override def next(): CarbonRow = {
+ val rawRow = rows.next().values.asInstanceOf[Array[Object]]
+ val row = if (isRawDataRequired) {
+ new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+ } else {
+ new CarbonRow(rowParser.parseRow(rawRow))
+ }
+ rowCounter.add(1)
+ row
+ }
+ }
+ }
+
def internalInputFunc(
rows: Iterator[InternalRow],
index: Int,