This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 181c1b3 [CARBONDATA-3887] Fixed insert failure for global sort null
data
181c1b3 is described below
commit 181c1b3c8d20678db9fd205f8afd376c32d3fb7d
Author: kunal642 <[email protected]>
AuthorDate: Fri Jul 3 13:31:05 2020 +0530
[CARBONDATA-3887] Fixed insert failure for global sort null data
Why is this PR needed?
Load data is failing with "Unsupported dataType: String" exception when
null data is loaded into a string column in global_sort table as there is No
handling for null data for string column
What changes were proposed in this PR?
Added a check for null data and handle for the same in global sort flow.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3822
---
.../apache/carbondata/spark/util/CommonUtil.scala | 93 +++++++++++-----------
.../dataload/TestGlobalSortDataLoad.scala | 12 +++
2 files changed, 60 insertions(+), 45 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 7e98605..dcbc9d2 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -921,52 +921,55 @@ object CommonUtil {
var i = 0
val fieldTypesLen = fields.length
while (i < fieldTypesLen) {
- if (!row.isNullAt(i)) {
- fields(i).dataType match {
- case StringType =>
- data(i) =
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+ fields(i).dataType match {
+ case StringType =>
+ data(i) = if (row.isNullAt(i)) {
+ DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(null,
DataTypes.STRING)
- case d: DecimalType =>
- data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
- case arrayType : ArrayType =>
- val result = convertSparkComplexTypeToCarbonObject(row.get(i,
arrayType), arrayType)
- // convert carbon complex object to byte array
- val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
- val dataOutputStream: DataOutputStream = new
DataOutputStream(byteArray)
-
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
- .writeByteArray(result.asInstanceOf[ArrayObject],
- dataOutputStream,
- badRecordLogHolder,
- true)
- dataOutputStream.close()
- data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
- case structType : StructType =>
- val result = convertSparkComplexTypeToCarbonObject(row.get(i,
structType), structType)
- // convert carbon complex object to byte array
- val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
- val dataOutputStream: DataOutputStream = new
DataOutputStream(byteArray)
-
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType]
- .writeByteArray(result.asInstanceOf[StructObject],
- dataOutputStream,
- badRecordLogHolder,
- true)
- dataOutputStream.close()
- data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
- case mapType : MapType =>
- val result = convertSparkComplexTypeToCarbonObject(row.get(i,
mapType), mapType)
- // convert carbon complex object to byte array
- val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
- val dataOutputStream: DataOutputStream = new
DataOutputStream(byteArray)
-
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
- .writeByteArray(result.asInstanceOf[ArrayObject],
- dataOutputStream,
- badRecordLogHolder,
- true)
- dataOutputStream.close()
- data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
- case other =>
- data(i) = row.get(i, other)
- }
+ } else {
+
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+ DataTypes.STRING)
+ }
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case arrayType: ArrayType =>
+ val result = convertSparkComplexTypeToCarbonObject(row.get(i,
arrayType), arrayType)
+ // convert carbon complex object to byte array
+ val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+ val dataOutputStream: DataOutputStream = new
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+ .writeByteArray(result.asInstanceOf[ArrayObject],
+ dataOutputStream,
+ badRecordLogHolder,
+ true)
+ dataOutputStream.close()
+ data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+ case structType: StructType =>
+ val result = convertSparkComplexTypeToCarbonObject(row.get(i,
structType), structType)
+ // convert carbon complex object to byte array
+ val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+ val dataOutputStream: DataOutputStream = new
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType]
+ .writeByteArray(result.asInstanceOf[StructObject],
+ dataOutputStream,
+ badRecordLogHolder,
+ true)
+ dataOutputStream.close()
+ data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+ case mapType: MapType =>
+ val result = convertSparkComplexTypeToCarbonObject(row.get(i,
mapType), mapType)
+ // convert carbon complex object to byte array
+ val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+ val dataOutputStream: DataOutputStream = new
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+ .writeByteArray(result.asInstanceOf[ArrayObject],
+ dataOutputStream,
+ badRecordLogHolder,
+ true)
+ dataOutputStream.close()
+ data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+ case other =>
+ data(i) = row.get(i, other)
}
i += 1
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index a228e21..f1851f1 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -89,6 +89,8 @@ class TestGlobalSortDataLoad extends QueryTest with
BeforeAndAfterEach with Befo
sql("DROP TABLE IF EXISTS carbon_globalsort_minor")
sql("DROP TABLE IF EXISTS carbon_globalsort_major")
sql("DROP TABLE IF EXISTS carbon_globalsort_custom")
+ sql("drop table if exists source")
+ sql("drop table if exists sink")
}
// ----------------------------------- Compare Result
-----------------------------------
@@ -468,6 +470,16 @@ class TestGlobalSortDataLoad extends QueryTest with
BeforeAndAfterEach with Befo
sql("SELECT * FROM carbon_localsort_difftypes ORDER BY shortField"))
}
+ test("test global sort with null values") {
+ sql("drop table if exists source")
+ sql("drop table if exists sink")
+ sql("create table source(a string, b int, c int, d int, e int, f int)
stored as carbondata TBLPROPERTIES('bad_record_action'='force')")
+ sql("insert into source select 'k','k', 'k','k','k', 'k'")
+ sql("create table sink (a string, b string, c int, d bigint, e double, f
char(5)) stored as carbondata TBLPROPERTIES('sort_scope'='global_sort',
'sort_columns'='b,c,d,f')")
+ sql("insert into sink select * from source")
+ checkAnswer(sql("select * from sink"), Row("k", null, null,null,null,
null))
+ }
+
private def resetConf() {
CarbonProperties.getInstance()
.removeProperty(CarbonCommonConstants.LOAD_SORT_SCOPE)