This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 00f64c6 [CARBONDATA-3565] Fix complex binary data broken issue when
loading dataframe data
00f64c6 is described below
commit 00f64c6f32710a0e2beddfdb6403b2faa879e031
Author: IceMimosa <[email protected]>
AuthorDate: Tue Jan 7 13:24:57 2020 +0800
[CARBONDATA-3565] Fix complex binary data broken issue when loading
dataframe data
Why is this PR needed?
When binary data is DataOutputStream#writeDouble and so on.
Spark DataFrame(SQL) load it to a table, the data will be broken (EF BF BD)
when reading out.
What changes were proposed in this PR?
If data is byte[], no need to convert to string and decode to byte[]
again
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3430
---
.../spark/rdd/NewCarbonDataLoadRDD.scala | 34 ++++++++++++++++------
.../testsuite/binary/TestBinaryDataType.scala | 34 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 9 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 23a2683..7caf644 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.{CollectionAccumulator,
SparkUtil}
import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
@@ -351,7 +351,7 @@ class NewDataFrameLoaderRDD[K, V](
/**
* This class wrap Scala's Iterator to Java's Iterator.
- * It also convert all columns to string data to use csv data loading flow.
+ * It also convert all columns to string data (exclude binary type) to use csv
data loading flow.
*
* @param rddIter
* @param carbonLoadModel
@@ -378,6 +378,9 @@ class NewRddIterator(rddIter: Iterator[Row],
private val isComplexTypeMapping =
carbonLoadModel.getCarbonDataLoadSchema
.getCarbonTable.getCreateOrderColumn.asScala.map(_.isComplex())
+ private val isDefaultBinaryDecoder = carbonLoadModel.getBinaryDecoder ==
null ||
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_DEFAULT.equals(
+ carbonLoadModel.getBinaryDecoder)
def hasNext: Boolean = rddIter.hasNext
def next: Array[AnyRef] = {
@@ -386,10 +389,15 @@ class NewRddIterator(rddIter: Iterator[Row],
val len = columns.length
var i = 0
while (i < len) {
- columns(i) = CarbonScalaUtil.getString(row, i, carbonLoadModel,
serializationNullFormat,
- complexDelimiters, timeStampFormat, dateFormat,
- isVarcharType = i < isVarcharTypeMapping.size &&
isVarcharTypeMapping(i),
- isComplexType = i < isComplexTypeMapping.size &&
isComplexTypeMapping(i))
+ columns(i) = row.get(i) match {
+ case bs if bs.isInstanceOf[Array[Byte]] && isDefaultBinaryDecoder =>
+ bs.asInstanceOf[Array[Byte]]
+ case _ =>
+ CarbonScalaUtil.getString(row, i, carbonLoadModel,
serializationNullFormat,
+ complexDelimiters, timeStampFormat, dateFormat,
+ isVarcharType = i < isVarcharTypeMapping.size &&
isVarcharTypeMapping(i),
+ isComplexType = i < isComplexTypeMapping.size &&
isComplexTypeMapping(i))
+ }
i += 1
}
columns
@@ -438,6 +446,9 @@ class LazyRddIterator(serializer: SerializerInstance,
r.isDefined && r.get
})
}
+ private val isDefaultBinaryDecoder = carbonLoadModel.getBinaryDecoder ==
null ||
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BINARY_DECODER_DEFAULT.equals(
+ carbonLoadModel.getBinaryDecoder)
private var rddIter: Iterator[Row] = null
private var uninitialized = true
@@ -460,9 +471,14 @@ class LazyRddIterator(serializer: SerializerInstance,
val row = rddIter.next()
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
- columns(i) = CarbonScalaUtil.getString(row, i, carbonLoadModel,
serializationNullFormat,
- complexDelimiters, timeStampFormat, dateFormat,
- isVarcharType = i < isVarcharTypeMapping.size &&
isVarcharTypeMapping(i))
+ columns(i) = row.get(i) match {
+ case bs if bs.isInstanceOf[Array[Byte]] && isDefaultBinaryDecoder =>
+ bs.asInstanceOf[Array[Byte]]
+ case _ =>
+ CarbonScalaUtil.getString(row, i, carbonLoadModel,
serializationNullFormat,
+ complexDelimiters, timeStampFormat, dateFormat,
+ isVarcharType = i < isVarcharTypeMapping.size &&
isVarcharTypeMapping(i))
+ }
}
columns
}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
index 4cd2b66..ae0eea8 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.integration.spark.testsuite.binary
+import java.io.{ByteArrayOutputStream, DataOutputStream}
import java.util.Arrays
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -1665,6 +1666,39 @@ class TestBinaryDataType extends QueryTest with
BeforeAndAfterAll {
sql("DROP TABLE binaryTable")
}
+ test("test complex binary insert into table") {
+ CarbonProperties.getInstance()
+
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
"true")
+ import sqlContext.implicits._
+ sql("DROP TABLE IF EXISTS binaryTable")
+ sql("DROP TABLE IF EXISTS binaryTable_carbondata")
+ sql("DROP TABLE IF EXISTS binaryTable_carbon")
+ sql(s"""CREATE TABLE IF NOT EXISTS binaryTable( binaryField binary )
STORED AS carbondata""")
+ sql(s"""CREATE TABLE IF NOT EXISTS binaryTable_carbondata( binaryField
binary ) USING CARBONDATA""")
+ sql(s"""CREATE TABLE IF NOT EXISTS binaryTable_carbon( binaryField
binary ) USING CARBON""")
+ // create binary data
+ val baos = new ByteArrayOutputStream()
+ val dos = new DataOutputStream(baos)
+ dos.writeInt(123)
+ dos.writeChars("abc")
+ dos.writeDouble(0.998123D)
+ dos.writeChars("def")
+ val bytes = baos.toByteArray
+
+ Seq(bytes).toDF("binaryField").write.insertInto("binaryTable")
+
Seq(bytes).toDF("binaryField").write.insertInto("binaryTable_carbondata")
+ Seq(bytes).toDF("binaryField").write.insertInto("binaryTable_carbon")
+ checkAnswer(sql("SELECT * FROM binaryTable"), Seq(Row(bytes)))
+ checkAnswer(sql("SELECT * FROM binaryTable_carbondata"),
Seq(Row(bytes)))
+ checkAnswer(sql("SELECT * FROM binaryTable_carbon"), Seq(Row(bytes)))
+ sql("DROP TABLE binaryTable")
+ sql("DROP TABLE binaryTable_carbondata")
+ sql("DROP TABLE binaryTable_carbon")
+ CarbonProperties.getInstance()
+
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
+
CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT)
+ }
+
override def afterAll: Unit = {
sqlContext.sparkSession.conf.unset("hive.exec.dynamic.partition.mode")
sql("DROP TABLE IF EXISTS binaryTable")