This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1345154 [CARBONDATA-3495] Fix Insert into Complex data type of Binary
failure with Carbon & SparkFileFormat
1345154 is described below
commit 13451547a7e5c6169d8ce375b152013a4f6b45bb
Author: Indhumathi27 <[email protected]>
AuthorDate: Thu Aug 22 16:01:49 2019 +0530
[CARBONDATA-3495] Fix Insert into Complex data type of Binary
failure with Carbon & SparkFileFormat
Problem:
Insert into Complex data type(Array/Struct/Map) of binary data
type fails with Invalid data type name, because Binary with
complex data types is not handled
Solution:
Handle Binary data type to work with complex data types
This closes #3361
---
.../core/datastore/page/ComplexColumnPage.java | 1 +
.../apache/carbondata/core/util/DataTypeUtil.java | 3 +
.../src/test/resources/complexbinary.csv | 3 +
.../complexType/TestComplexDataType.scala | 114 +++++++++++++++++++++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 +
.../SparkCarbonDataSourceBinaryTest.scala | 88 ++++++++++++++++
.../processing/datatypes/PrimitiveDataType.java | 3 +
.../org/apache/carbondata/sdk/file/ImageTest.java | 41 ++++++++
8 files changed, 255 insertions(+)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index 921ae50..c4f8849 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -124,6 +124,7 @@ public class ComplexColumnPage {
DataTypes.isMapType(dataType) ||
(dataType == DataTypes.STRING) ||
(dataType == DataTypes.VARCHAR) ||
+ (dataType == DataTypes.BINARY) ||
(dataType == DataTypes.DATE) ||
DataTypes.isDecimal(dataType))))) {
// For all these above condition the ColumnPage should be Taken as
BYTE_ARRAY
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 9aea579..adb63cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -530,6 +530,7 @@ public final class DataTypeUtil {
public static boolean isFixedSizeDataType(DataType dataType) {
if (dataType == DataTypes.STRING ||
dataType == DataTypes.VARCHAR ||
+ dataType == DataTypes.BINARY ||
DataTypes.isDecimal(dataType)) {
return false;
} else {
@@ -1019,6 +1020,8 @@ public final class DataTypeUtil {
return DataTypes.BYTE_ARRAY;
} else if (DataTypes.BYTE_ARRAY.getName().equalsIgnoreCase(name)) {
return DataTypes.BYTE_ARRAY;
+ } else if (DataTypes.BINARY.getName().equalsIgnoreCase(name)) {
+ return DataTypes.BINARY;
} else if (name.equalsIgnoreCase("decimal")) {
return DataTypes.createDefaultDecimalType();
} else if (name.equalsIgnoreCase("array")) {
diff --git a/integration/spark-common-test/src/test/resources/complexbinary.csv
b/integration/spark-common-test/src/test/resources/complexbinary.csv
new file mode 100644
index 0000000..3870f5f
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/complexbinary.csv
@@ -0,0 +1,3 @@
+1,true,abc,binary1$binary2,binary1,1&binary1
+2,false,abcd,binary11$binary12,binary11,1&binary2
+3,true,abcde,binary13$binary13,binary13,1&binary3
\ No newline at end of file
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index b5f77c2..9d6b4d1 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -1013,4 +1013,118 @@ class TestComplexDataType extends QueryTest with
BeforeAndAfterAll {
checkAnswer(sql("select
id,name,structField.intval,name,structField.stringval from
table1"),Seq(Row(null,"aaa",23,"aaa","bb")))
}
+ test("test array of binary data type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ sql("create table if not exists hive_table(id int, label boolean, name
string," +
+ "binaryField array<binary>, autoLabel boolean) row format delimited
fields terminated by ','")
+ sql("insert into hive_table values(1,true,'abc',array('binary'),false)")
+ sql("create table if not exists carbon_table(id int, label boolean, name
string," +
+ "binaryField array<binary>, autoLabel boolean) stored by 'carbondata'")
+ sql("insert into carbon_table values(1,true,'abc',array('binary'),false)")
+ checkAnswer(sql("SELECT binaryField[0] FROM carbon_table"),
+ sql("SELECT binaryField[0] FROM hive_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ }
+
+ test("test struct of binary data type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ sql("create table if not exists parquet_table(id int, label boolean, name
string," +
+ "binaryField struct<b:binary>, autoLabel boolean) using parquet")
+ sql("insert into parquet_table
values(1,true,'abc',named_struct('b','binary'),false)")
+ sql("create table if not exists carbon_table(id int, label boolean, name
string," +
+ "binaryField struct<b:binary>, autoLabel boolean) stored by
'carbondata'")
+ sql("insert into carbon_table
values(1,true,'abc',named_struct('b','binary'),false)")
+ sql("SELECT binaryField.b FROM carbon_table").show(false)
+ checkAnswer(sql("SELECT binaryField.b FROM carbon_table"),
+ sql("SELECT binaryField.b FROM parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ }
+
+ test("test map of binary data type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ sql("create table if not exists hive_table(id int, label boolean, name
string," +
+ "binaryField map<int, binary>, autoLabel boolean) row format delimited
fields terminated by ','")
+ sql("insert into hive_table values(1,true,'abc',map(1,'binary'),false)")
+ sql("create table if not exists carbon_table(id int, label boolean, name
string," +
+ "binaryField map<int, binary>, autoLabel boolean) stored by
'carbondata'")
+ sql("insert into carbon_table values(1,true,'abc',map(1,'binary'),false)")
+ checkAnswer(sql("SELECT binaryField[1] FROM carbon_table"),
+ sql("SELECT binaryField[1] FROM hive_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ }
+
+ test("test map of array and struct binary data type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ sql("create table if not exists parquet_table(id int, label boolean, name
string," +
+ "binaryField1 map<int, array<binary>>, binaryField2 map<int,
struct<b:binary>> ) " +
+ "using parquet")
+ sql("insert into parquet_table
values(1,true,'abc',map(1,array('binary')),map(1," +
+ "named_struct('b','binary')))")
+ sql("create table if not exists carbon_table(id int, label boolean, name
string," +
+ "binaryField1 map<int, array<binary>>, binaryField2 map<int,
struct<b:binary>> ) " +
+ "stored by 'carbondata'")
+ sql("insert into carbon_table
values(1,true,'abc',map(1,array('binary')),map(1," +
+ "named_struct('b','binary')))")
+ checkAnswer(sql("SELECT binaryField1[1][1] FROM carbon_table"),
+ sql("SELECT binaryField1[1][1] FROM parquet_table"))
+ checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"),
+ sql("SELECT binaryField2[1].b FROM parquet_table"))
+ sql("drop table if exists hive_table")
+ sql("drop table if exists carbon_table")
+ }
+
+ test("test of array of struct and struct of array of binary data type") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ sql("create table if not exists hive_table(id int, label boolean, name
string," +
+ "binaryField1 array<struct<b1:binary>>, binaryField2
struct<b2:array<binary>> ) " +
+ "row format delimited fields terminated by ','")
+ sql("insert into hive_table
values(1,true,'abc',array(named_struct('b1','binary'))," +
+ "named_struct('b2',array('binary')))")
+ sql("create table if not exists carbon_table(id int, label boolean, name
string," +
+ "binaryField1 array<struct<b1:binary>>, binaryField2
struct<b2:array<binary>> ) " +
+ "stored by 'carbondata'")
+ sql("insert into carbon_table
values(1,true,'abc',array(named_struct('b1','binary'))," +
+ "named_struct('b2',array('binary')))")
+ checkAnswer(sql("SELECT binaryField1[1].b1 FROM carbon_table"),
+ sql("SELECT binaryField1[1].b1 FROM hive_table"))
+ checkAnswer(sql("SELECT binaryField2.b2[0] FROM carbon_table"),
+ sql("SELECT binaryField2.b2[0] FROM hive_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ }
+
+ test("test dataload to complex of binary type column using load ddl ") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ sql("create table if not exists hive_table(id int, label boolean, name
string," +
+ "binaryField1 array<binary>, binaryField2 struct<b2:binary>,
binaryField3 map<int," +
+ "binary>) row format delimited fields terminated by ','")
+ sql(
+ "insert into hive_table values(1,true,'abc',array('binary1','binary2'),
named_struct('b2'," +
+ "'binary1'), map(1,'binary1'))")
+ sql("create table if not exists carbon_table(id int, label boolean, name
string," +
+ "binaryField1 array<binary>, binaryField2 struct<b2:binary>,
binaryField3 map<int,binary>) " +
+ "stored by 'carbondata'")
+ sql(
+ "load data inpath '" + resourcesPath + "/complexbinary.csv' into table
carbon_table options" +
+ "('delimiter'=',',
'quotechar'='\\','fileheader'='id,label,name,binaryField1,binaryField2," +
+ "binaryField3','complex_delimiter_level_1'='$',
'complex_delimiter_level_2'='&')")
+ checkAnswer(sql("SELECT binaryField1[0] FROM carbon_table where id=1"),
+ sql("SELECT binaryField1[0] FROM hive_table where id=1"))
+ checkAnswer(sql("SELECT binaryField2.b2 FROM carbon_table where id=1"),
+ sql("SELECT binaryField2.b2 FROM hive_table where id=1"))
+ checkAnswer(sql("SELECT binaryField3[1] FROM carbon_table where id=1"),
+ sql("SELECT binaryField3[1] FROM hive_table where id=1"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists hive_table")
+ }
+
}
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3d3b89d..c331532 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1511,6 +1511,8 @@ abstract class CarbonDDLSqlParser extends
AbstractCarbonSparkSQLParser {
field.dataType.getOrElse("NIL") match {
case "String" => Field(parentName + "." + field.column, Some("String"),
Some(parentName + "." + field.name.getOrElse(None)), Some(null),
parentName)
+ case "binary" => Field(parentName + "." + field.column, Some("Binary"),
+ Some(parentName + "." + field.name.getOrElse(None)), Some(null),
parentName)
case "SmallInt" => Field(parentName + "." + field.column,
Some("SmallInt"),
Some(parentName + "." + field.name.getOrElse(None)), Some(null),
parentName)
case "Integer" => Field(parentName + "." + field.column, Some("Integer"),
diff --git
a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
index 9ebf54f..9a3a9f0 100644
---
a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
+++
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.carbondata.datasource
import java.io.File
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.util.BinaryUtil
@@ -24,6 +25,7 @@ import org.apache.commons.codec.binary.{Base64, Hex}
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.carbondata.datasource.TestUtil._
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.util.SparkUtil
import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -635,4 +637,90 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite
with BeforeAndAfterAll {
assert(exception.getMessage.contains("Operation not allowed: DELETE
FROM"))
}
+ test("test array of binary data type with sparkfileformat ") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ sql("create table if not exists carbon_table(id int, label boolean,
name string," +
+ "binaryField array<binary>, autoLabel boolean) using carbon")
+ sql("insert into carbon_table
values(1,true,'abc',array('binary'),false)")
+ sql("create table if not exists parquet_table(id int, label boolean,
name string," +
+ "binaryField array<binary>, autoLabel boolean) using parquet")
+ sql("insert into parquet_table
values(1,true,'abc',array('binary'),false)")
+ checkAnswer(sql("SELECT binaryField[0] FROM carbon_table"),
+ sql("SELECT binaryField[0] FROM parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test struct of binary data type with sparkfileformat ") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ sql("create table if not exists carbon_table(id int, label boolean,
name string," +
+ "binaryField struct<b:binary>, autoLabel boolean) using carbon")
+ sql("insert into carbon_table
values(1,true,'abc',named_struct('b','binary'),false)")
+ sql("create table if not exists parquet_table(id int, label boolean,
name string," +
+ "binaryField struct<b:binary>, autoLabel boolean) using parquet")
+ sql("insert into parquet_table
values(1,true,'abc',named_struct('b','binary'),false)")
+ checkAnswer(sql("SELECT binaryField.b FROM carbon_table"),
+ sql("SELECT binaryField.b FROM parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test map of binary data type with sparkfileformat") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ sql("create table if not exists parquet_table(id int, label boolean,
name string," +
+ "binaryField map<int, binary>, autoLabel boolean) using parquet")
+ sql("insert into parquet_table
values(1,true,'abc',map(1,'binary'),false)")
+ sql("create table if not exists carbon_table(id int, label boolean,
name string," +
+ "binaryField map<int, binary>, autoLabel boolean) using carbon")
+ sql("insert into carbon_table
values(1,true,'abc',map(1,'binary'),false)")
+ checkAnswer(sql("SELECT binaryField[1] FROM carbon_table"),
+ sql("SELECT binaryField[1] FROM parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
+ test("test map of array and struct binary data type with sparkfileformat")
{
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ sql("create table if not exists parquet_table(id int, label boolean,
name string," +
+ "binaryField1 map<int, array<binary>>, binaryField2 map<int,
struct<b:binary>> ) " +
+ "using parquet")
+ sql("insert into parquet_table
values(1,true,'abc',map(1,array('binary')),map(1," +
+ "named_struct('b','binary')))")
+ sql("create table if not exists carbon_table(id int, label boolean,
name string," +
+ "binaryField1 map<int, array<binary>>, binaryField2 map<int,
struct<b:binary>> ) " +
+ "using carbon")
+ sql("insert into carbon_table
values(1,true,'abc',map(1,array('binary')),map(1," +
+ "named_struct('b','binary')))")
+ checkAnswer(sql("SELECT binaryField1[1][1] FROM carbon_table"),
+ sql("SELECT binaryField1[1][1] FROM parquet_table"))
+ checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"),
+ sql("SELECT binaryField2[1].b FROM parquet_table"))
+ sql("drop table if exists carbon_table")
+ }
+
+ test("test of array of struct and struct of array of binary data type with
sparkfileformat") {
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ sql("create table if not exists parquet_table(id int, label boolean,
name string," +
+ "binaryField1 array<struct<b1:binary>>, binaryField2
struct<b2:array<binary>> ) " +
+ "using parquet")
+ sql("insert into parquet_table
values(1,true,'abc',array(named_struct('b1','binary'))," +
+ "named_struct('b2',array('binary')))")
+ sql("create table if not exists carbon_table(id int, label boolean,
name string," +
+ "binaryField1 array<struct<b1:binary>>, binaryField2
struct<b2:array<binary>> ) " +
+ "using carbon")
+ sql("insert into carbon_table
values(1,true,'abc',array(named_struct('b1','binary'))," +
+ "named_struct('b2',array('binary')))")
+ checkAnswer(sql("SELECT binaryField1[1].b1 FROM carbon_table"),
+ sql("SELECT binaryField1[1].b1 FROM parquet_table"))
+ checkAnswer(sql("SELECT binaryField2.b2[0] FROM carbon_table"),
+ sql("SELECT binaryField2.b2[0] FROM parquet_table"))
+ sql("drop table if exists carbon_table")
+ sql("drop table if exists parquet_table")
+ }
+
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 200a9f6..9504974 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -378,6 +378,9 @@ public class PrimitiveDataType implements
GenericDataType<Object> {
} else {
value = ByteUtil.toXorBytes(Long.parseLong(parsedValue));
}
+ } else if
(this.carbonDimension.getDataType().equals(DataTypes.BINARY)) {
+ value =
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(input,
+ this.carbonDimension.getDataType());
} else {
value =
DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
this.carbonDimension.getDataType(), dateFormat);
diff --git
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
index 30a881b..6f90155 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -22,6 +22,7 @@ import junit.framework.TestCase;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
@@ -1127,4 +1128,44 @@ public class ImageTest extends TestCase {
}
}
+ @Test public void testBinaryWithComplexType()
+ throws IOException, InvalidLoadOptionException, InterruptedException {
+ int num = 1;
+ int rows = 1;
+ String path = "./target/binary";
+ try {
+ FileUtils.deleteDirectory(new File(path));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Field[] fields = new Field[4];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("arrayField",
DataTypes.createArrayType(DataTypes.BINARY));
+ ArrayList<StructField> structFields = new ArrayList<>();
+ structFields.add(new StructField("b", DataTypes.BINARY));
+ fields[3] = new Field("structField",
DataTypes.createStructType(structFields));
+
+ // read and write image data
+ for (int j = 0; j < num; j++) {
+ CarbonWriter writer =
CarbonWriter.builder().outputPath(path).withCsvInput(new Schema(fields))
+ .writtenBy("BinaryExample").withPageSizeInMb(1).build();
+
+ for (int i = 0; i < rows; i++) {
+ // write data
+ writer.write(new String[] { "robot" + (i % 10), String.valueOf(i),
"binary1", "binary2" });
+ }
+ writer.close();
+ }
+ CarbonReader reader = CarbonReader.builder(path, "_temp").build();
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ Object[] arrayResult = (Object[]) row[1];
+ Object[] structResult = (Object[]) row[2];
+ assert (new String((byte[]) arrayResult[0]).equalsIgnoreCase("binary1"));
+ assert (new String((byte[])
structResult[0]).equalsIgnoreCase("binary2"));
+ }
+ reader.close();
+ }
+
}