Repository: carbondata Updated Branches: refs/heads/master f5c7a19b8 -> 68b359e15
[CARBONDATA-2894] Add support for complex map type through spark carbon file format API This PR supports loading querying complex map type through spark carbon file format API. This closes #2663 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/68b359e1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/68b359e1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/68b359e1 Branch: refs/heads/master Commit: 68b359e156780cb92defdce3033ed5c2e1d7e744 Parents: f5c7a19 Author: manishgupta88 <[email protected]> Authored: Mon Aug 27 19:17:21 2018 +0530 Committer: ravipesala <[email protected]> Committed: Sat Sep 8 14:22:02 2018 +0530 ---------------------------------------------------------------------- .../datasources/CarbonSparkDataSourceUtil.scala | 12 +- .../datasources/SparkCarbonFileFormat.scala | 23 +++- .../datasource/SparkCarbonDataSourceTest.scala | 126 ++++++++++++++++++- .../carbondata/sdk/file/AvroCarbonWriter.java | 4 +- .../sdk/file/CarbonWriterBuilder.java | 5 +- .../org/apache/carbondata/sdk/file/Field.java | 59 +++++++++ 6 files changed, 213 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala index 77c1dce..b097320 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField, StructType => CarbonStructType} +import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField} import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} import org.apache.carbondata.core.scan.expression.conditional._ import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} @@ -78,6 +78,10 @@ object CarbonSparkDataSourceUtil { convertSparkToCarbonDataType(field.dataType))) } CarbonDataTypes.createStructType(carbonFields) + case MapType(keyType, valueType, _) => + val keyDataType: CarbonDataType = convertSparkToCarbonDataType(keyType) + val valueDataType: CarbonDataType = convertSparkToCarbonDataType(valueType) + CarbonDataTypes.createMapType(keyDataType, valueDataType) case NullType => CarbonDataTypes.NULL case decimal: DecimalType => CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale) @@ -196,11 +200,7 @@ object CarbonSparkDataSourceUtil { dataSchema: StructType): CarbonLoadModel = { val schema = new Schema(dataSchema.fields.map { field => val dataType = convertSparkToCarbonDataType(field.dataType) - dataType match { - case s: CarbonStructType => - new Field(field.name, s, s.getFields) - case _ => new Field(field.name, dataType) - } + new Field(field.name, dataType) }) val builder = new CarbonWriterBuilder builder.isTransactionalTable(false) http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index 406e2c9..a5e1b39 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUn import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types._ @@ -200,8 +200,10 @@ class SparkCarbonFileFormat extends FileFormat data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal case s: StructType => data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields)) - case s: ArrayType => - data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType)) + case a: ArrayType => + data(i) = new ArrayObject(extractData(row.getArray(i), a.elementType)) + case m: MapType => + data(i) = extractMapData(row.getMap(i), m) case d: DateType => data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef] case d: TimestampType => @@ -217,6 +219,15 @@ class SparkCarbonFileFormat extends FileFormat data } + private def extractMapData(data: AnyRef, mapType: MapType): ArrayObject = { + val mapData = data.asInstanceOf[MapData] + val keys = extractData(mapData.keyArray(), mapType.keyType) + val values = extractData(mapData.valueArray(), mapType.valueType) + new ArrayObject(keys.zip(values).map { case (key, value) => + new StructObject(Array(key, value)) + }) + } + private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = { dataType match { case d: DateType => @@ -241,8 +252,10 @@ class SparkCarbonFileFormat extends FileFormat data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal case s: StructType => data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields)) - case s: ArrayType => - data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType)) + case a: ArrayType => + data(i) = new ArrayObject(extractData(row.getArray(i), a.elementType)) + case m: MapType => + data(i) = extractMapData(row.getMap(i), m) case d: DateType => data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef] case d: TimestampType => http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala index dcc76d8..66c0224 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala @@ -148,7 +148,6 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { df.write .format("parquet").saveAsTable("parquet_table") - spark.sql("describe parquet_table").show(false) spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:string>, number int) using carbon") spark.sql("insert into carbon_table select * from parquet_table") assert(spark.sql("select * from carbon_table").count() == 10) @@ -212,6 +211,131 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { spark.sql("drop table if exists parquet_table") } + test("test write with array type with value as nested map type") { + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, Array(Map("b" -> "c")), x)) + .toDF("c1", "c2", "number") + + df.write + .format("parquet").saveAsTable("parquet_table") + spark.sql("create table carbon_table(c1 string, c2 array<map<string,string>>, number int) using carbon") + spark.sql("insert into carbon_table select * from parquet_table") + assert(spark.sql("select * from carbon_table").count() == 10) + TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table")) + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + } + + test("test write with struct type with value as nested map type") { + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, ("a", Map("b" -> "c")), x)) + .toDF("c1", "c2", "number") + + df.write + .format("parquet").saveAsTable("parquet_table") + spark.sql("create table carbon_table(c1 string, c2 struct<a1:string, a2:map<string,string>>, number int) using carbon") + spark.sql("insert into carbon_table select * from parquet_table") + assert(spark.sql("select * from carbon_table").count() == 10) + TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table")) + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + } + + test("test write with map type") { + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, Map("b" -> "c"), x)) + .toDF("c1", "c2", "number") + + df.write + .format("parquet").saveAsTable("parquet_table") + spark.sql("create table carbon_table(c1 string, c2 map<string, string>, number int) using carbon") + spark.sql("insert into carbon_table select * from parquet_table") + assert(spark.sql("select * from carbon_table").count() == 10) + TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table")) + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + } + + test("test write with map type with Int data type as key") { + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, Map(99 -> "c"), x)) + .toDF("c1", "c2", "number") + + df.write + .format("parquet").saveAsTable("parquet_table") + spark.sql("create table carbon_table(c1 string, c2 map<int, string>, number int) using carbon") + spark.sql("insert into carbon_table select * from parquet_table") + assert(spark.sql("select * from carbon_table").count() == 10) + TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table")) + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + } + + test("test write with map type with value as nested map type") { + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, Map("a" -> Map("b" -> "c")), x)) + .toDF("c1", "c2", "number") + + df.write + .format("parquet").saveAsTable("parquet_table") + spark.sql("create table carbon_table(c1 string, c2 map<string, map<string, string>>, number int) using carbon") + spark.sql("insert into carbon_table select * from parquet_table") + assert(spark.sql("select * from carbon_table").count() == 10) + TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table")) + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + } + + test("test write with map type with value as nested struct type") { + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, Map("a" -> ("b", "c")), x)) + .toDF("c1", "c2", "number") + + df.write + .format("parquet").saveAsTable("parquet_table") + spark.sql("create table carbon_table(c1 string, c2 map<string, struct<a1:string, a2:string>>, number int) using carbon") + spark.sql("insert into carbon_table select * from parquet_table") + assert(spark.sql("select * from carbon_table").count() == 10) + TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table")) + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + } + + test("test write with map type with value as nested array type") { + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, Map("a" -> Array("b", "c")), x)) + .toDF("c1", "c2", "number") + + df.write + .format("parquet").saveAsTable("parquet_table") + spark.sql("create table carbon_table(c1 string, c2 map<string, array<string>>, number int) using carbon") + spark.sql("insert into carbon_table select * from parquet_table") + assert(spark.sql("select * from carbon_table").count() == 10) + TestUtil.checkAnswer(spark.sql("select * from carbon_table"), spark.sql("select * from parquet_table")) + spark.sql("drop table if exists carbon_table") + spark.sql("drop table if exists parquet_table") + } test("test write using ddl and options") { spark.sql("drop table if exists carbon_table") http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index d1e936e..14dbe16 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -468,7 +468,7 @@ public class AvroCarbonWriter extends CarbonWriter { case MAP: // recursively get the sub fields ArrayList<StructField> mapSubFields = new ArrayList<>(); - StructField mapField = prepareSubFields("val", childSchema); + StructField mapField = prepareSubFields(fieldName, childSchema); if (null != mapField) { // key value field will be wrapped inside a map struct field StructField keyValueField = mapField.getChildren().get(0); @@ -575,7 +575,7 @@ public class AvroCarbonWriter extends CarbonWriter { keyValueFields.add(keyField); keyValueFields.add(valueField); StructField mapKeyValueField = - new StructField(fieldName, DataTypes.createStructType(keyValueFields)); + new StructField(fieldName + ".val", DataTypes.createStructType(keyValueFields)); // value dataType will be at position 1 in the fields MapType mapType = DataTypes.createMapType(DataTypes.STRING, mapKeyValueField.getDataType()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 76dd7aa..56757e4 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.MapType; import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; @@ -673,8 +674,8 @@ public class CarbonWriterBuilder { .addColumn(new StructField(field.getFieldName(), complexType), valIndex, false); } else if (field.getDataType().getName().equalsIgnoreCase("MAP")) { // Loop through the inner columns for MapType - DataType mapType = - DataTypes.createMapType(DataTypes.STRING, field.getChildren().get(0).getDataType()); + DataType mapType = DataTypes.createMapType(((MapType) field.getDataType()).getKeyType(), + field.getChildren().get(0).getDataType()); tableSchemaBuilder .addColumn(new StructField(field.getFieldName(), mapType), valIndex, false); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/68b359e1/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java index 1c5ab52..6903200 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java @@ -17,14 +17,18 @@ package org.apache.carbondata.sdk.file; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.metadata.datatype.ArrayType; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.MapType; import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; /** @@ -130,6 +134,7 @@ public class Field { public Field(String name, DataType type) { this.name = name; this.type = type; + initComplexTypeChildren(); } /** @@ -218,4 +223,58 @@ public class Field { public void updateNameToLowerCase() { this.name = name.toLowerCase(); } + + private void initComplexTypeChildren() { + if (getDataType().isComplexType()) { + StructField subFields = prepareSubFields(getFieldName(), getDataType()); + if (DataTypes.isArrayType(getDataType()) || DataTypes.isMapType(getDataType())) { + children = subFields.getChildren(); + } else if (DataTypes.isStructType(getDataType())) { + children = ((StructType) subFields.getDataType()).getFields(); + } + } + } + + /** + * prepare sub fields for complex types + * + * @param fieldName column name + * @param dataType data type of column or it's children + * @return + */ + private StructField prepareSubFields(String fieldName, DataType dataType) { + if (DataTypes.isArrayType(dataType)) { + List<StructField> arrayFields = new ArrayList<>(); + StructField arrayField = prepareSubFields(fieldName, ((ArrayType) dataType).getElementType()); + arrayFields.add(arrayField); + return new StructField(fieldName, DataTypes.createArrayType(arrayField.getDataType()), + arrayFields); + } else if (DataTypes.isStructType(dataType)) { + List<StructField> structFields = new ArrayList<>(); + List<StructField> fields = ((StructType) dataType).getFields(); + for (StructField field : fields) { + structFields.add(prepareSubFields(field.getFieldName(), field.getDataType())); + } + return new StructField(fieldName, DataTypes.createStructType(structFields), structFields); + } else if (DataTypes.isMapType(dataType)) { + // Internally Map<key, value> is stored as Array<struct<key, value>>. So the below method + // will convert a map type into similar field structure. The columnSchema will be formed + // as Map<Struct<key,value>> + List<StructField> mapFields = new ArrayList<>(); + MapType mapType = (MapType) dataType; + // key is primitive type so type can be fetched directly + StructField keyField = new StructField(fieldName + ".key", mapType.getKeyType()); + StructField valueField = prepareSubFields(fieldName + ".value", mapType.getValueType()); + mapFields.add(keyField); + mapFields.add(valueField); + StructField field = + new StructField(fieldName + ".val", DataTypes.createStructType(mapFields)); + MapType mapDataType = DataTypes.createMapType(keyField.getDataType(), field.getDataType()); + List<StructField> mapStructField = new ArrayList<>(); + mapStructField.add(field); + return new StructField(fieldName, mapDataType, mapStructField); + } else { + return new StructField(fieldName, dataType); + } + } }
