This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 3aea196715a9954e38dccdc50824e8f6a0a75de3 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Thu Aug 22 16:01:49 2019 +0530 [CARBONDATA-3495] Fix Insert into Complex data type of Binary failure with Carbon & SparkFileFormat Problem: Insert into Complex data type(Array/Struct/Map) of binary data type fails with Invalid data type name, because Binary with complex data types is not handled Solution: Handle Binary data type to work with complex data types This closes #3361 --- .../core/datastore/page/ComplexColumnPage.java | 1 + .../apache/carbondata/core/util/DataTypeUtil.java | 3 + .../src/test/resources/complexbinary.csv | 3 + .../complexType/TestComplexDataType.scala | 114 +++++++++++++++++++++ .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 + .../SparkCarbonDataSourceBinaryTest.scala | 88 ++++++++++++++++ .../processing/datatypes/PrimitiveDataType.java | 3 + .../org/apache/carbondata/sdk/file/ImageTest.java | 41 ++++++++ 8 files changed, 255 insertions(+) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java index 921ae50..c4f8849 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java @@ -124,6 +124,7 @@ public class ComplexColumnPage { DataTypes.isMapType(dataType) || (dataType == DataTypes.STRING) || (dataType == DataTypes.VARCHAR) || + (dataType == DataTypes.BINARY) || (dataType == DataTypes.DATE) || DataTypes.isDecimal(dataType))))) { // For all these above condition the ColumnPage should be Taken as BYTE_ARRAY diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 9aea579..adb63cd 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -530,6 +530,7 @@ public final class DataTypeUtil { public static boolean isFixedSizeDataType(DataType dataType) { if (dataType == DataTypes.STRING || dataType == DataTypes.VARCHAR || + dataType == DataTypes.BINARY || DataTypes.isDecimal(dataType)) { return false; } else { @@ -1019,6 +1020,8 @@ public final class DataTypeUtil { return DataTypes.BYTE_ARRAY; } else if (DataTypes.BYTE_ARRAY.getName().equalsIgnoreCase(name)) { return DataTypes.BYTE_ARRAY; + } else if (DataTypes.BINARY.getName().equalsIgnoreCase(name)) { + return DataTypes.BINARY; } else if (name.equalsIgnoreCase("decimal")) { return DataTypes.createDefaultDecimalType(); } else if (name.equalsIgnoreCase("array")) { diff --git a/integration/spark-common-test/src/test/resources/complexbinary.csv b/integration/spark-common-test/src/test/resources/complexbinary.csv new file mode 100644 index 0000000..3870f5f --- /dev/null +++ b/integration/spark-common-test/src/test/resources/complexbinary.csv @@ -0,0 +1,3 @@ +1,true,abc,binary1$binary2,binary1,1&binary1 +2,false,abcd,binary11$binary12,binary11,1&binary2 +3,true,abcde,binary13$binary13,binary13,1&binary3 \ No newline at end of file diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala index b5f77c2..9d6b4d1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala @@ -1013,4 +1013,118 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select id,name,structField.intval,name,structField.stringval from table1"),Seq(Row(null,"aaa",23,"aaa","bb"))) } + test("test array of binary data type") { + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + sql("create table if not exists hive_table(id int, label boolean, name string," + + "binaryField array<binary>, autoLabel boolean) row format delimited fields terminated by ','") + sql("insert into hive_table values(1,true,'abc',array('binary'),false)") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField array<binary>, autoLabel boolean) stored by 'carbondata'") + sql("insert into carbon_table values(1,true,'abc',array('binary'),false)") + checkAnswer(sql("SELECT binaryField[0] FROM carbon_table"), + sql("SELECT binaryField[0] FROM hive_table")) + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + } + + test("test struct of binary data type") { + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + sql("create table if not exists parquet_table(id int, label boolean, name string," + + "binaryField struct<b:binary>, autoLabel boolean) using parquet") + sql("insert into parquet_table values(1,true,'abc',named_struct('b','binary'),false)") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField struct<b:binary>, autoLabel boolean) stored by 'carbondata'") + sql("insert into carbon_table values(1,true,'abc',named_struct('b','binary'),false)") + sql("SELECT binaryField.b FROM carbon_table").show(false) + checkAnswer(sql("SELECT binaryField.b FROM carbon_table"), + sql("SELECT binaryField.b FROM parquet_table")) + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + } + + test("test map of binary data type") { + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + sql("create table if not exists hive_table(id int, label boolean, name string," + + "binaryField map<int, binary>, autoLabel boolean) row format delimited fields terminated by ','") + sql("insert into hive_table values(1,true,'abc',map(1,'binary'),false)") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField map<int, binary>, autoLabel boolean) stored by 'carbondata'") + sql("insert into carbon_table values(1,true,'abc',map(1,'binary'),false)") + checkAnswer(sql("SELECT binaryField[1] FROM carbon_table"), + sql("SELECT binaryField[1] FROM hive_table")) + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + } + + test("test map of array and struct binary data type") { + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + sql("create table if not exists parquet_table(id int, label boolean, name string," + + "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>> ) " + + "using parquet") + sql("insert into parquet_table values(1,true,'abc',map(1,array('binary')),map(1," + + "named_struct('b','binary')))") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>> ) " + + "stored by 'carbondata'") + sql("insert into carbon_table values(1,true,'abc',map(1,array('binary')),map(1," + + "named_struct('b','binary')))") + checkAnswer(sql("SELECT binaryField1[1][1] FROM carbon_table"), + sql("SELECT binaryField1[1][1] FROM parquet_table")) + checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"), + sql("SELECT binaryField2[1].b FROM parquet_table")) + sql("drop table if exists hive_table") + sql("drop table if exists carbon_table") + } + + test("test of array of struct and struct of array of binary data type") { + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + sql("create table if not exists hive_table(id int, label boolean, name string," + + "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>> ) " + + "row format delimited fields terminated by ','") + sql("insert into hive_table values(1,true,'abc',array(named_struct('b1','binary'))," + + "named_struct('b2',array('binary')))") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>> ) " + + "stored by 'carbondata'") + sql("insert into carbon_table values(1,true,'abc',array(named_struct('b1','binary'))," + + "named_struct('b2',array('binary')))") + checkAnswer(sql("SELECT binaryField1[1].b1 FROM carbon_table"), + sql("SELECT binaryField1[1].b1 FROM hive_table")) + checkAnswer(sql("SELECT binaryField2.b2[0] FROM carbon_table"), + sql("SELECT binaryField2.b2[0] FROM hive_table")) + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + } + + test("test dataload to complex of binary type column using load ddl ") { + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + sql("create table if not exists hive_table(id int, label boolean, name string," + + "binaryField1 array<binary>, binaryField2 struct<b2:binary>, binaryField3 map<int," + + "binary>) row format delimited fields terminated by ','") + sql( + "insert into hive_table values(1,true,'abc',array('binary1','binary2'), named_struct('b2'," + + "'binary1'), map(1,'binary1'))") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField1 array<binary>, binaryField2 struct<b2:binary>, binaryField3 map<int,binary>) " + + "stored by 'carbondata'") + sql( + "load data inpath '" + resourcesPath + "/complexbinary.csv' into table carbon_table options" + + "('delimiter'=',', 'quotechar'='\\','fileheader'='id,label,name,binaryField1,binaryField2," + + "binaryField3','complex_delimiter_level_1'='$', 'complex_delimiter_level_2'='&')") + checkAnswer(sql("SELECT binaryField1[0] FROM carbon_table where id=1"), + sql("SELECT binaryField1[0] FROM hive_table where id=1")) + checkAnswer(sql("SELECT binaryField2.b2 FROM carbon_table where id=1"), + sql("SELECT binaryField2.b2 FROM hive_table where id=1")) + checkAnswer(sql("SELECT binaryField3[1] FROM carbon_table where id=1"), + sql("SELECT binaryField3[1] FROM hive_table where id=1")) + sql("drop table if exists carbon_table") + sql("drop table if exists hive_table") + } + } diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 3d3b89d..c331532 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -1511,6 +1511,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { field.dataType.getOrElse("NIL") match { case "String" => Field(parentName + "." + field.column, Some("String"), Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName) + case "binary" => Field(parentName + "." + field.column, Some("Binary"), + Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName) case "SmallInt" => Field(parentName + "." + field.column, Some("SmallInt"), Some(parentName + "." + field.name.getOrElse(None)), Some(null), parentName) case "Integer" => Field(parentName + "." + field.column, Some("Integer"), diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala index 9ebf54f..9a3a9f0 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.carbondata.datasource import java.io.File + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.sdk.util.BinaryUtil @@ -24,6 +25,7 @@ import org.apache.commons.codec.binary.{Base64, Hex} import org.apache.commons.io.FileUtils import org.apache.spark.sql.Row import org.apache.spark.sql.carbondata.datasource.TestUtil._ +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.util.SparkUtil import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -635,4 +637,90 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { assert(exception.getMessage.contains("Operation not allowed: DELETE FROM")) } + test("test array of binary data type with sparkfileformat ") { + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField array<binary>, autoLabel boolean) using carbon") + sql("insert into carbon_table values(1,true,'abc',array('binary'),false)") + sql("create table if not exists parquet_table(id int, label boolean, name string," + + "binaryField array<binary>, autoLabel boolean) using parquet") + sql("insert into parquet_table values(1,true,'abc',array('binary'),false)") + checkAnswer(sql("SELECT binaryField[0] FROM carbon_table"), + sql("SELECT binaryField[0] FROM parquet_table")) + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + } + + test("test struct of binary data type with sparkfileformat ") { + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField struct<b:binary>, autoLabel boolean) using carbon") + sql("insert into carbon_table values(1,true,'abc',named_struct('b','binary'),false)") + sql("create table if not exists parquet_table(id int, label boolean, name string," + + "binaryField struct<b:binary>, autoLabel boolean) using parquet") + sql("insert into parquet_table values(1,true,'abc',named_struct('b','binary'),false)") + checkAnswer(sql("SELECT binaryField.b FROM carbon_table"), + sql("SELECT binaryField.b FROM parquet_table")) + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + } + + test("test map of binary data type with sparkfileformat") { + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + sql("create table if not exists parquet_table(id int, label boolean, name string," + + "binaryField map<int, binary>, autoLabel boolean) using parquet") + sql("insert into parquet_table values(1,true,'abc',map(1,'binary'),false)") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField map<int, binary>, autoLabel boolean) using carbon") + sql("insert into carbon_table values(1,true,'abc',map(1,'binary'),false)") + checkAnswer(sql("SELECT binaryField[1] FROM carbon_table"), + sql("SELECT binaryField[1] FROM parquet_table")) + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + } + + test("test map of array and struct binary data type with sparkfileformat") { + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + sql("create table if not exists parquet_table(id int, label boolean, name string," + + "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>> ) " + + "using parquet") + sql("insert into parquet_table values(1,true,'abc',map(1,array('binary')),map(1," + + "named_struct('b','binary')))") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField1 map<int, array<binary>>, binaryField2 map<int, struct<b:binary>> ) " + + "using carbon") + sql("insert into carbon_table values(1,true,'abc',map(1,array('binary')),map(1," + + "named_struct('b','binary')))") + checkAnswer(sql("SELECT binaryField1[1][1] FROM carbon_table"), + sql("SELECT binaryField1[1][1] FROM parquet_table")) + checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"), + sql("SELECT binaryField2[1].b FROM parquet_table")) + sql("drop table if exists carbon_table") + } + + test("test of array of struct and struct of array of binary data type with sparkfileformat") { + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + sql("create table if not exists parquet_table(id int, label boolean, name string," + + "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>> ) " + + "using parquet") + sql("insert into parquet_table values(1,true,'abc',array(named_struct('b1','binary'))," + + "named_struct('b2',array('binary')))") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField1 array<struct<b1:binary>>, binaryField2 struct<b2:array<binary>> ) " + + "using carbon") + sql("insert into carbon_table values(1,true,'abc',array(named_struct('b1','binary'))," + + "named_struct('b2',array('binary')))") + checkAnswer(sql("SELECT binaryField1[1].b1 FROM carbon_table"), + sql("SELECT binaryField1[1].b1 FROM parquet_table")) + checkAnswer(sql("SELECT binaryField2.b2[0] FROM carbon_table"), + sql("SELECT binaryField2.b2[0] FROM parquet_table")) + sql("drop table if exists carbon_table") + sql("drop table if exists parquet_table") + } + } diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index 200a9f6..9504974 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -378,6 +378,9 @@ public class PrimitiveDataType implements GenericDataType<Object> { } else { value = ByteUtil.toXorBytes(Long.parseLong(parsedValue)); } + } else if (this.carbonDimension.getDataType().equals(DataTypes.BINARY)) { + value = DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(input, + this.carbonDimension.getDataType()); } else { value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue, this.carbonDimension.getDataType(), dateFormat); diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java index 30a881b..6f90155 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java @@ -22,6 +22,7 @@ import junit.framework.TestCase; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.LiteralExpression; import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; @@ -1127,4 +1128,44 @@ public class ImageTest extends TestCase { } } + @Test public void testBinaryWithComplexType() + throws IOException, InvalidLoadOptionException, InterruptedException { + int num = 1; + int rows = 1; + String path = "./target/binary"; + try { + FileUtils.deleteDirectory(new File(path)); + } catch (IOException e) { + e.printStackTrace(); + } + Field[] fields = new Field[4]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + fields[2] = new Field("arrayField", DataTypes.createArrayType(DataTypes.BINARY)); + ArrayList<StructField> structFields = new ArrayList<>(); + structFields.add(new StructField("b", DataTypes.BINARY)); + fields[3] = new Field("structField", DataTypes.createStructType(structFields)); + + // read and write image data + for (int j = 0; j < num; j++) { + CarbonWriter writer = CarbonWriter.builder().outputPath(path).withCsvInput(new Schema(fields)) + .writtenBy("BinaryExample").withPageSizeInMb(1).build(); + + for (int i = 0; i < rows; i++) { + // write data + writer.write(new String[] { "robot" + (i % 10), String.valueOf(i), "binary1", "binary2" }); + } + writer.close(); + } + CarbonReader reader = CarbonReader.builder(path, "_temp").build(); + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + Object[] arrayResult = (Object[]) row[1]; + Object[] structResult = (Object[]) row[2]; + assert (new String((byte[]) arrayResult[0]).equalsIgnoreCase("binary1")); + assert (new String((byte[]) structResult[0]).equalsIgnoreCase("binary2")); + } + reader.close(); + } + }