[CARBONDATA-2457] Add converter to get Carbon SDK Schema from Avro schema directly.
In the current implementation, SDK users have to manually create carbon schema of fields from avro schema. This is time-consuming and error-prone. Also, user should not be worried about this logic. So, abstract the carbon schema creation from avro schema by exposing a method to user. This closes #2283 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cf3e9196 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cf3e9196 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cf3e9196 Branch: refs/heads/spark-2.3 Commit: cf3e919651f3b84c3045d1be8fb89a8a8cfd8242 Parents: 747be9b Author: ajantha-bhat <[email protected]> Authored: Tue May 8 12:12:37 2018 +0530 Committer: ravipesala <[email protected]> Committed: Wed May 9 18:07:05 2018 +0530 ---------------------------------------------------------------------- .../TestNonTransactionalCarbonTable.scala | 172 ++----------------- .../carbondata/sdk/file/AvroCarbonWriter.java | 129 +++++++++++++- .../sdk/file/AvroCarbonWriterTest.java | 63 +------ 3 files changed, 145 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf3e9196/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index c641ed3..2f88c40 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -31,7 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema} +import org.apache.carbondata.sdk.file.{AvroCarbonWriter, CarbonWriter, Field, Schema} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -39,8 +39,6 @@ import org.apache.avro import org.apache.commons.lang.CharEncoding import tech.allegro.schema.json2avro.converter.JsonAvroConverter -import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField} - class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { var writerPath = new File(this.getClass.getResource("/").getPath @@ -351,8 +349,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - - test("read non transactional table, files written from sdk Writer Output)") { buildTestDataSingleFile() assert(new File(writerPath).exists()) @@ -531,7 +527,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - test("Read sdk writer output file without any file should fail") { buildTestDataSingleFile() deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) @@ -685,11 +680,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } - - private def WriteFilesWithAvroWriter(rows: Int, - mySchema: String, - json: String, - fields: Array[Field]) = { + private def WriteFilesWithAvroWriter(rows: Int, mySchema: String, json: String): Unit = { // conversion to GenericData.Record val nn = new avro.Schema.Parser().parse(mySchema) val converter = new JsonAvroConverter @@ -697,7 +688,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn) try { - val writer = CarbonWriter.builder.withSchema(new Schema(fields)) + val writer = CarbonWriter.builder + .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema)) .outputPath(writerPath).isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput var i = 0 @@ -734,16 +726,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { """.stripMargin val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """ - - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - val fld = new util.ArrayList[StructField] - fld.add(new StructField("street", DataTypes.STRING)) - fld.add(new StructField("city", DataTypes.STRING)) - fields(2) = new Field("address", "struct", fld) - - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataStructType(): Any = { @@ -782,17 +765,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { """.stripMargin val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """ - - - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - // fields[1] = new Field("age", DataTypes.INT); - val fld = new util.ArrayList[StructField] - fld.add(new StructField("street", DataTypes.STRING)) - fields(2) = new Field("address", "array", fld) - - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataSingleFileArrayType(): Any = { @@ -836,18 +809,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { """ {"name":"bob", "age":10, |"address" : {"street":"abc", "city":"bang"}, |"doorNum" : [1,2,3,4]}""".stripMargin - - val fields = new Array[Field](4) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - val fld = new util.ArrayList[StructField] - fld.add(new StructField("street", DataTypes.STRING)) - fld.add(new StructField("city", DataTypes.STRING)) - fields(2) = new Field("address", "struct", fld) - val fld1 = new util.ArrayList[StructField] - fld1.add(new StructField("eachDoorNum", DataTypes.INT)) - fields(3) = new Field("doorNum", "array", fld1) - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataBothStructArrayType(): Any = { @@ -855,7 +817,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { buildAvroTestDataStructWithArrayType(3, null) } - // ArrayOfStruct test def buildAvroTestDataArrayOfStruct(rows: Int, options: util.Map[String, String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -900,20 +861,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { |{"street":"def","city":"city2"}, |{"street":"ghi","city":"city3"}, |{"street":"jkl","city":"city4"}]} """.stripMargin - - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - - val fld = new util.ArrayList[StructField] - fld.add(new StructField("street", DataTypes.STRING)) - fld.add(new StructField("city", DataTypes.STRING)) - - val fld2 = new util.ArrayList[StructField] - fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld)) - fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2) - - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataArrayOfStructType(): Any = { @@ -921,7 +869,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { buildAvroTestDataArrayOfStruct(3, null) } - // StructOfArray test def buildAvroTestDataStructOfArray(rows: Int, options: util.Map[String, String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -983,21 +930,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | ] | } |} """.stripMargin - - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - - val fld2 = new util.ArrayList[StructField] - fld2.add(new StructField("street", DataTypes.STRING)) - fld2.add(new StructField("city", DataTypes.STRING)) - - val fld1 = new util.ArrayList[StructField] - fld1.add(new StructField("eachDoorNum", DataTypes.INT)) - fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1)) - - fields(2) = new Field("address","struct",fld2) - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataStructOfArrayType(): Any = { @@ -1005,7 +938,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { buildAvroTestDataStructOfArray(3, null) } - test("Read sdk writer Avro output Record Type") { buildAvroTestDataStructType() assert(new File(writerPath).exists()) @@ -1014,7 +946,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION |'$writerPath' """.stripMargin) - checkAnswer(sql("select * from sdkOutputTable"), Seq( Row("bob", 10, Row("abc","bang")), Row("bob", 10, Row("abc","bang")), @@ -1073,7 +1004,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - test("Read sdk writer Avro output with Array of struct") { buildAvroTestDataArrayOfStructType() assert(new File(writerPath).exists()) @@ -1099,7 +1029,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - // Struct of array test("Read sdk writer Avro output with struct of Array") { buildAvroTestDataStructOfArrayType() @@ -1201,21 +1130,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | } | ] |} """.stripMargin - - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - - val fld = new util.ArrayList[StructField] - fld.add(new StructField("street", DataTypes.STRING)) - fld.add(new StructField("city", DataTypes.STRING)) - fld.add(new StructField("FloorNum", DataTypes.createArrayType(DataTypes.INT))) - - val fld2 = new util.ArrayList[StructField] - fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld)) - fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2) - - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataMultiLevel3Type(): Any = { @@ -1253,7 +1168,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - // test multi level -- 3 levels [array of struct of struct of string, int] def buildAvroTestDataMultiLevel3_1(rows: Int, options: util.Map[String, String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -1333,26 +1247,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | } | ] |} """.stripMargin - - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - - val fld = new util.ArrayList[StructField] - fld.add(new StructField("street", DataTypes.STRING)) - fld.add(new StructField("city", DataTypes.STRING)) - - val subFld = new util.ArrayList[StructField] - subFld.add(new StructField("wing", DataTypes.STRING)) - subFld.add(new StructField("number", DataTypes.INT)) - fld.add(new StructField("FloorNum", DataTypes.createStructType(subFld))) - - // array of struct of struct - val fld2 = new util.ArrayList[StructField] - fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld)) - fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2) - - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataMultiLevel3_1Type(): Any = { @@ -1432,22 +1327,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | "BuildNum": [[[1,2,3],[4,5,6]],[[10,20,30],[40,50,60]]] | } """.stripMargin - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - - val subFld = new util.ArrayList[StructField] - subFld.add(new StructField("EachDoorNum", DataTypes.INT)) - - val fld = new util.ArrayList[StructField] - fld.add(new StructField("DoorNum", DataTypes.createArrayType(DataTypes.INT), subFld)) - // array of struct of struct - val doorNum = new util.ArrayList[StructField] - doorNum.add(new StructField("FloorNum", - DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.INT)), fld)) - fields(2) = new Field("BuildNum", "array", doorNum) - - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataMultiLevel3_2Type(): Any = { @@ -1486,8 +1366,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - - // test multi level -- 4 levels [array of array of array of struct] def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -1566,30 +1444,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | ] | ] |} """.stripMargin - - val fields = new Array[Field](3) - fields(0) = new Field("name", DataTypes.STRING) - fields(1) = new Field("age", DataTypes.INT) - - val subFld = new util.ArrayList[StructField] - subFld.add(new StructField("EachDoorNum", DataTypes.INT)) - - val address = new util.ArrayList[StructField] - address.add(new StructField("street", DataTypes.STRING)) - address.add(new StructField("city", DataTypes.STRING)) - - val fld = new util.ArrayList[StructField] - fld.add(new StructField("DoorNum", - DataTypes.createArrayType(DataTypes.createStructType(address)), - subFld)) - // array of struct of struct - val doorNum = new util.ArrayList[StructField] - doorNum.add(new StructField("FloorNum", - DataTypes.createArrayType( - DataTypes.createArrayType(DataTypes.createStructType(address))), fld)) - fields(2) = new Field("BuildNum", "array", doorNum) - - WriteFilesWithAvroWriter(rows, mySchema, json, fields) + WriteFilesWithAvroWriter(rows, mySchema, json) } def buildAvroTestDataMultiLevel4Type(): Any = { @@ -1615,5 +1470,4 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf3e9196/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java index 946040f..55fd211 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java @@ -24,6 +24,9 @@ import java.util.Random; import java.util.UUID; import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; @@ -46,7 +49,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; * Writer Implementation to write Avro Record to carbondata file. */ @InterfaceAudience.Internal -class AvroCarbonWriter extends CarbonWriter { +public class AvroCarbonWriter extends CarbonWriter { private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; private TaskAttemptContext context; @@ -118,12 +121,134 @@ class AvroCarbonWriter extends CarbonWriter { break; default: - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException( + "carbon not support " + type.toString() + " avro type yet"); } return out; } /** + * converts avro schema to carbon schema required by carbonWriter + * + * @param avroSchemaString json formatted avro schema as string + * @return carbon sdk schema + */ + public static org.apache.carbondata.sdk.file.Schema getCarbonSchemaFromAvroSchema( + String avroSchemaString) { + if (avroSchemaString == null) { + throw new UnsupportedOperationException("avro schema string cannot be null"); + } + Schema avroSchema = new Schema.Parser().parse(avroSchemaString); + Field[] carbonField = new Field[avroSchema.getFields().size()]; + int i = 0; + for (Schema.Field avroField : avroSchema.getFields()) { + carbonField[i] = prepareFields(avroField); + i++; + } + return new org.apache.carbondata.sdk.file.Schema(carbonField); + } + + private static Field prepareFields(Schema.Field avroField) { + String FieldName = avroField.name(); + Schema childSchema = avroField.schema(); + Schema.Type type = childSchema.getType(); + switch (type) { + case BOOLEAN: + return new Field(FieldName, DataTypes.BOOLEAN); + case INT: + return new Field(FieldName, DataTypes.INT); + case LONG: + return new Field(FieldName, DataTypes.LONG); + case DOUBLE: + return new Field(FieldName, DataTypes.DOUBLE); + case STRING: + return new Field(FieldName, DataTypes.STRING); + case FLOAT: + return new Field(FieldName, DataTypes.DOUBLE); + case RECORD: + // recursively get the sub fields + ArrayList<StructField> structSubFields = new ArrayList<>(); + for (Schema.Field avroSubField : childSchema.getFields()) { + structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema())); + } + return new Field(FieldName, "struct", structSubFields); + case ARRAY: + // recursively get the sub fields + ArrayList<StructField> arraySubField = new ArrayList<>(); + // array will have only one sub field. + arraySubField.add(prepareSubFields("val", childSchema.getElementType())); + return new Field(FieldName, "array", arraySubField); + default: + throw new UnsupportedOperationException( + "carbon not support " + type.toString() + " avro type yet"); + } + } + + private static StructField prepareSubFields(String FieldName, Schema childSchema) { + Schema.Type type = childSchema.getType(); + switch (type) { + case BOOLEAN: + return new StructField(FieldName, DataTypes.BOOLEAN); + case INT: + return new StructField(FieldName, DataTypes.INT); + case LONG: + return new StructField(FieldName, DataTypes.LONG); + case DOUBLE: + return new StructField(FieldName, DataTypes.DOUBLE); + case STRING: + return new StructField(FieldName, DataTypes.STRING); + case FLOAT: + return new StructField(FieldName, DataTypes.DOUBLE); + case RECORD: + // recursively get the sub fields + ArrayList<StructField> structSubFields = new ArrayList<>(); + for (Schema.Field avroSubField : childSchema.getFields()) { + structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema())); + } + return (new StructField(FieldName, DataTypes.createStructType(structSubFields))); + case ARRAY: + // recursively get the sub fields + // array will have only one sub field. + return (new StructField(FieldName, DataTypes.createArrayType( + getMappingDataTypeForArrayRecord(childSchema.getElementType())))); + default: + throw new UnsupportedOperationException( + "carbon not support " + type.toString() + " avro type yet"); + } + } + + private static DataType getMappingDataTypeForArrayRecord(Schema childSchema) { + switch (childSchema.getType()) { + case BOOLEAN: + return DataTypes.BOOLEAN; + case INT: + return DataTypes.INT; + case LONG: + return DataTypes.LONG; + case DOUBLE: + return DataTypes.DOUBLE; + case STRING: + return DataTypes.STRING; + case FLOAT: + return DataTypes.DOUBLE; + case RECORD: + // recursively get the sub fields + ArrayList<StructField> structSubFields = new ArrayList<>(); + for (Schema.Field avroSubField : childSchema.getFields()) { + structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema())); + } + return DataTypes.createStructType(structSubFields); + case ARRAY: + // array will have only one sub field. + return DataTypes.createArrayType( + getMappingDataTypeForArrayRecord(childSchema.getElementType())); + default: + throw new UnsupportedOperationException( + "carbon not support " + childSchema.getType().toString() + " avro type yet"); + } + } + + /** * Write single row data, input row is Avro Record */ @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf3e9196/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java index 105fb6d..163512a 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java @@ -21,30 +21,20 @@ import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.List; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.datatype.ArrayType; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.datatype.StructField; -import org.apache.carbondata.core.metadata.datatype.StructType; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.CharEncoding; -import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; -import scala.Array; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; import org.apache.avro.Schema; -import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType.file; public class AvroCarbonWriterTest { private String path = "./AvroCarbonWriterSuiteWriteFiles"; @@ -70,13 +60,9 @@ public class AvroCarbonWriterTest { GenericData.Record record = converter.convertToGenericDataRecord( json.getBytes(CharEncoding.UTF_8), new Schema.Parser().parse(avroSchema)); - Field[] fields = new Field[2]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("age", DataTypes.STRING); - try { CarbonWriter writer = CarbonWriter.builder() - .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema)) .outputPath(path) .isTransactionalTable(true) .buildWriterForAvroInput(); @@ -145,19 +131,9 @@ public class AvroCarbonWriterTest { GenericData.Record record = converter.convertToGenericDataRecord( json.getBytes(CharEncoding.UTF_8), new Schema.Parser().parse(avroSchema)); - Field[] fields = new Field[6]; - // fields[0] = new Field("mynull", DataTypes.NULL); - fields[0] = new Field("myboolean", DataTypes.BOOLEAN); - fields[1] = new Field("myint", DataTypes.INT); - fields[2] = new Field("mylong", DataTypes.LONG); - fields[3] = new Field("myfloat", DataTypes.DOUBLE); - fields[4] = new Field("mydouble", DataTypes.DOUBLE); - fields[5] = new Field("mystring", DataTypes.STRING); - - try { CarbonWriter writer = CarbonWriter.builder() - .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema)) .outputPath(path) .isTransactionalTable(true) .buildWriterForAvroInput(); @@ -250,18 +226,9 @@ public class AvroCarbonWriterTest { GenericData.Record record = converter.convertToGenericDataRecord( json.getBytes(CharEncoding.UTF_8), nn); - Field[] fields = new Field[3]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("name1", DataTypes.STRING); - // fields[1] = new Field("age", DataTypes.INT); - List fld = new ArrayList<StructField>(); - fld.add(new StructField("street", DataTypes.STRING)); - fld.add(new StructField("city", DataTypes.STRING)); - fields[2] = new Field("address", "struct", fld); - try { CarbonWriter writer = CarbonWriter.builder() - .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema)) .outputPath(path) .isTransactionalTable(true) .buildWriterForAvroInput(); @@ -323,18 +290,9 @@ public class AvroCarbonWriterTest { GenericData.Record record = converter.convertToGenericDataRecord( json.getBytes(CharEncoding.UTF_8), nn); - Field[] fields = new Field[3]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("name1", DataTypes.STRING); - // fields[1] = new Field("age", DataTypes.INT); - List fld = new ArrayList<StructField>(); - fld.add(new StructField("street", DataTypes.STRING)); - fld.add(new StructField("city", DataTypes.STRING)); - fields[2] = new Field("address", "struct", fld); - try { CarbonWriter writer = CarbonWriter.builder() - .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema)) .outputPath(path) .isTransactionalTable(true) .buildWriterForAvroInput(); @@ -365,17 +323,6 @@ public class AvroCarbonWriterTest { private void WriteAvroComplexData(String mySchema, String json, String[] sortColumns) throws UnsupportedEncodingException, IOException, InvalidLoadOptionException { - Field[] fields = new Field[4]; - fields[0] = new Field("name", DataTypes.STRING); - fields[1] = new Field("name1", DataTypes.STRING); - // fields[1] = new Field("age", DataTypes.INT); - List fld = new ArrayList<StructField>(); - fld.add(new StructField("street", DataTypes.STRING)); - fld.add(new StructField("city", DataTypes.STRING)); - fields[2] = new Field("address", "struct", fld); - List fld1 = new ArrayList<StructField>(); - fld1.add(new StructField("eachDoorNum", DataTypes.INT)); - fields[3] = new Field("doorNum","array",fld1); // conversion to GenericData.Record Schema nn = new Schema.Parser().parse(mySchema); @@ -385,7 +332,7 @@ public class AvroCarbonWriterTest { try { CarbonWriter writer = CarbonWriter.builder() - .withSchema(new org.apache.carbondata.sdk.file.Schema(fields)) + .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema)) .outputPath(path) .isTransactionalTable(true).sortBy(sortColumns) .buildWriterForAvroInput();
