Repository: carbondata
Updated Branches:
  refs/heads/master f012f5b13 -> b588cb655


[CARBONDATA-2876]Support Avro datatype conversion through SDK

This PR supports following Avro DataTypes to carbon format through SDK. Avro 
datatypes include,
1. Avro Union
2. Avro Enum
3. Avro Logical type Decimal

This closes #2671


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b588cb65
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b588cb65
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b588cb65

Branch: refs/heads/master
Commit: b588cb65564d26cdf55da7482ae7b1ee79173067
Parents: f012f5b
Author: Indhumathi27 <indhumathi...@gmail.com>
Authored: Thu Aug 30 14:50:06 2018 +0530
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Fri Aug 31 14:41:56 2018 +0530

----------------------------------------------------------------------
 ...ansactionalCarbonTableWithAvroDataType.scala | 793 +++++++++++++++++++
 .../carbondata/sdk/file/AvroCarbonWriter.java   | 331 +++++++-
 2 files changed, 1088 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b588cb65/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
new file mode 100644
index 0000000..b50407c
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -0,0 +1,793 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.File
+import scala.collection.mutable
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.file.CarbonWriter
+
+/**
+ * Test class for Avro supported data types through SDK
+ */
+class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with 
BeforeAndAfterAll {
+
+
+  val badRecordAction = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION)
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./target/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+
+  writerPath = writerPath.replace("\\", "/")
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "force")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, 
badRecordAction)
+  }
+
+  test("test enum") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        | "fields":
+        | [{
+        | "name": "id",
+        |                                              "type": {
+        |                    "type": "enum",
+        |                    "name": "Suit",
+        |                    "symbols": ["SPADES", "HEARTS", "DIAMONDS", 
"CLUBS"]
+        |                    }
+        |                }
+        |            ]
+        |}""".stripMargin
+
+    val json1 =
+      """{"id":"HEARTS"}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("HEARTS")))
+  }
+
+  test("test enum with struct type") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val mySchema =
+      "{ " +
+      "  \"name\": \"address\", " +
+      "  \"type\": \"record\", " +
+      "  \"fields\": [ " +
+      "    { " +
+      "      \"name\": \"name\", " +
+      "      \"type\": \"string\" " +
+      "    }, " +
+      "    { " +
+      "      \"name\": \"age\", " +
+      "      \"type\": \"int\" " +
+      "    }, " +
+      "    { " +
+      "       \"name\": \"address\",  \"type\": {" +
+      "        \"type\" : \"record\",  \"name\" : \"my_address\"," +
+      "         \"fields\" : [" +
+      "         {\"name\": \"enumRec\", " +
+      "           \"type\": { " +
+      "            \"type\": \"enum\", " +
+      "             \"name\": \"card\", " +
+      "             \"symbols\": [\"SPADES\", \"HEARTS\", \"DIAMONDS\", 
\"CLUBS\"] " +
+      "      } " +
+      "}]}" +
+      "    } " +
+      "  ] " +
+      "} "
+
+    val json1 = "{\"name\":\"bob\", \"age\":10, \"address\": 
{\"enumRec\":\"SPADES\"}}"
+
+    val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
+    val record = testUtil.jsonToAvro(json1, mySchema)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("bob", 10, 
Row("SPADES"))))
+  }
+
+  test("test enum with Array type") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val mySchema =
+      """ {
+        |      "name": "address",
+        |      "type": "record",
+        |      "fields": [
+        |      {
+        |      "name": "name",
+        |      "type": "string"
+        |      },
+        |      {
+        |      "name": "age",
+        |      "type": "int"
+        |      },
+        |      {
+        |      "name": "address",
+        |      "type": {
+        |      "type": "array",
+        |      "items": {
+        |                    "name": "Suit",
+        |                    "type": "enum",
+        |                    "symbols": ["SPADES", "HEARTS", "DIAMONDS", 
"CLUBS"]
+        |      }}}]
+        |  }
+      """.stripMargin
+
+    val json: String = """ {"name": "bob","age": 10,"address": ["SPADES", 
"DIAMONDS"]} """
+
+    val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
+    val record = testUtil.jsonToAvro(json, mySchema)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"),
+      Seq(Row("bob", 10, mutable.WrappedArray.make(Array("SPADES", 
"DIAMONDS")))))
+  }
+
+  test("test union type long") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |       { "name": "first", "type": ["string", "int", "long"] }
+        |     ]
+        |}""".stripMargin
+
+    val json1 =
+      """{"first":{"long":10345}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null, null, 
10345))))
+  }
+
+  test("test union type boolean") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |       { "name": "first", "type": ["boolean", "int", "long"] }
+        |     ]
+        |}""".stripMargin
+
+    val json1 =
+      """{"first":{"boolean":true}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(true, null, 
null))))
+  }
+
+  test("test union type string") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |       { "name": "first", "type": ["string", "int", "long"] }
+        |     ]
+        |}""".stripMargin
+
+    val json1 =
+      """{"first":{"string":"abc"}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row("abc", null, 
null))))
+  }
+
+  test("test union type int") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |       { "name": "first", "type": ["string", "int", "long"] }
+        |     ]
+        |}""".stripMargin
+
+    val json1 =
+      """{"first":{"int":10}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null, 10, 
null))))
+  }
+
+  test("test union type with null") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |       { "name": "first", "type": ["null", "int"] }
+        |     ]
+        |}""".stripMargin
+
+    val json1 =
+      """{"first":{"null":null}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null))))
+  }
+
+  test("test union type with only type null") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |       { "name": "first", "type": ["null"] }
+        |     ]
+        |}""".stripMargin
+
+    val json1 =
+      """{"first":{"null":null}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+    val exception1 = intercept[UnsupportedOperationException] {
+      val writer = CarbonWriter.builder
+        
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+      writer.write(record)
+      writer.close()
+    }
+    assert(exception1.getMessage
+      .contains("Carbon do not support Avro UNION with only null type"))
+  }
+
+  test("test union type with Enum") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |      {
+        |                "name": "enum_field", "type": [{
+        |          "namespace": "org.example.avro",
+        |          "name": "EnumField",
+        |          "type": "enum",
+        |          "symbols": [
+        |                              "VAL_0",
+        |                              "VAL_1"
+        |                      ]
+        |        },"null"], "default": null
+        |      }]
+        |}""".stripMargin
+
+    val json1 =
+      """{"enum_field":{"org.example.avro.EnumField":"VAL_0"}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row("VAL_0"))))
+  }
+
+  test("test union type with Map") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |      {
+        |                "name": "map_field", "type": [{
+        |          "namespace": "org.example.avro",
+        |          "name": "mapField",
+        |          "type": "map",
+        |          "values":"string"
+        |        },"int"], "default": null
+        |      }]
+        |}""".stripMargin
+
+    val json1 =
+      """{"map_field":{"map":{"street": "k-lane", "city": 
"bangalore"}}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    sql("select * from sdkOutputTable").show(false)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row(Row(Map("city" -> "bangalore", "street" -> "k-lane"), null))))
+  }
+
+  test("test union type") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |      {
+        |                "name": "struct_field", "type": [{
+        |          "namespace": "org.example.avro",
+        |          "name": "structField",
+        |          "type": "array",
+        |          "items": { "name" : "name0", "type":"string"}
+        |        },"int"], "default": null
+        |      }]
+        |}""".stripMargin
+
+    val json1 =
+      """{"struct_field":{"int":12}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"),
+      Seq(Row(Row(mutable.WrappedArray.make(Array(null)), 12))))
+  }
+
+  test("test Struct of Union") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """
+        |{"name": "address",
+        | "type": "record",
+        | "fields": [
+        |  { "name": "address",  "type": {
+        |    "type" : "record",  "name" : "my_address",
+        |        "fields" : [
+        |    {"name": "city", "type": ["string","int"]}]}}
+        |]}
+      """.stripMargin
+
+    val json1 =
+      """{"address":{"city":{"int":1}}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    sql("describe formatted sdkOutputTable").show(false)
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(Row(null, 
1)))))
+    sql("insert into sdkOutputTable values('abc:12')")
+    sql("select address.city.city0 from sdkOutputTable").show(false)
+  }
+
+  test("test Union with struct of array") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """
+        |{"name": "address",
+        | "type": "record",
+        | "fields": [
+        |  { "name": "address",  "type": {
+        |    "type" : "record",  "name" : "my_address",
+        |        "fields" : [
+        |    {"name": "city", "type": ["string",  {
+        |                "type": "array",
+        |                "name": "abc_name_0",
+        |                "items": {
+        |                  "name": "_name_0",
+        |                  "type": "record",
+        |                  "fields": [
+        |                    {
+        |                      "name": "app_id",
+        |                      "type": [
+        |                        "null",
+        |                        "string"
+        |                      ]
+        |                    }
+        |                    ]
+        |                    }
+        |                    }
+        |                    ]}]}}
+        |]}
+      """.stripMargin
+
+    val json1 =
+      """{
+        |"address":{"city":
+        |{"array":[
+        |        {
+        |          "app_id": {
+        |            "string": "abc"
+        |          }}]
+        |          }}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkAnswer(sql("select * from sdkOutputTable"),
+      Seq(Row(Row(Row(null, 
mutable.WrappedArray.make(Array(Row(Row("abc")))))))))
+  }
+
+  test("test union type with Array and Struct") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |  "type": "record",
+        |  "namespace": "example.avro",
+        |  "name": "array_union",
+        |  "fields": [
+        |    {
+        |      "name": "body",
+        |      "type": {
+        |        "name": "body",
+        |        "type": "record",
+        |        "fields": [
+        |          {
+        |            "name": "abc",
+        |            "type": [
+        |              "int",
+        |              {
+        |                "type": "array",
+        |                "name": "abc_name_0",
+        |                "items": {
+        |                  "name": "_name_0",
+        |                  "type": "record",
+        |                  "fields": [
+        |                    {
+        |                      "name": "app_id",
+        |                      "type": [
+        |                        "null",
+        |                        "string"
+        |                      ]
+        |                    },
+        |                    {
+        |                      "name": "app_name",
+        |                      "type": [
+        |                        "int",
+        |                        "float",
+        |                        "string"
+        |                      ]
+        |                    },
+        |                    {
+        |                      "name": "app_key",
+        |                      "type": [
+        |                        "null",
+        |                        "string"
+        |                      ]
+        |                    }
+        |                  ]
+        |                }
+        |              }
+        |            ]
+        |          }
+        |        ]
+        |      }
+        |    }
+        |  ]
+        |}""".stripMargin
+
+    val json1 =
+      """{
+        |  "body": {
+        |    "abc": {
+        |      "array": [
+        |        {
+        |          "app_id": {
+        |            "string": "abc"
+        |          },
+        |          "app_name": {
+        |            "string": "bcd"
+        |          },
+        |          "app_key": {
+        |            "string": "cde"
+        |          }
+        |        }
+        |      ]
+        |    }
+        |  }
+        |}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    sql("describe formatted sdkOutputTable").show(false)
+    checkAnswer(sql("select * from sdkOutputTable"),
+      Seq(Row(Row(Row(null,
+        mutable.WrappedArray.make(Array(Row(Row("abc"), Row(null, null, 
"bcd"), Row("cde")))))))))
+  }
+
+  test("test union type with Decimal") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """{
+        |      "namespace": "com.apache.schema",
+        |      "type": "record",
+        |      "name": "StudentActivity",
+        |      "fields": [
+        |      {
+        |                "name": "enum_field", "type": [{
+        |          "namespace": "org.example.avro",
+        |          "name": "dec",
+        |          "type": "bytes",
+        |         "logicalType": "decimal",
+        |                     "precision": 10,
+        |                     "scale": 2
+        |        },"null"]
+        |      }]
+        |}""".stripMargin
+
+    val json1 =
+      """{"enum_field":{"bytes":"1010"}}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id decimal(4,3)) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkExistence(sql("select * from sdkOutputTable"), true, "1010.00")
+  }
+
+  test("test logical type decimal with struct") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """
+        |{"name": "address",
+        | "type": "record",
+        | "fields": [
+        |  { "name": "name", "type": "string"},
+        |  { "name": "age", "type": "float"},
+        |  { "name": "address",  "type": {
+        |    "type" : "record",  "name" : "my_address",
+        |        "fields" : [
+        |    {"name": "street", "type": "string"},
+        |    {"name": "city", "type": {"type" : "bytes",
+        |                     "logicalType": "decimal",
+        |                     "precision": 4,
+        |                     "scale": 2
+        |                    }}]}}
+        |]}
+      """.stripMargin
+
+    val json1 = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", 
"city":"32"}} """
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id struct<b:decimal(4,3)>) 
STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkExistence(sql("select * from sdkOutputTable"), true, "32.00")
+  }
+
+  test("test logical type decimal with Array") {
+    sql("drop table if exists sdkOutputTable")
+    
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(writerPath))
+    val schema1 =
+      """ {
+        |      "name": "address",
+        |      "type": "record",
+        |      "fields": [
+        |      {
+        |      "name": "name",
+        |      "type": "string"
+        |      },
+        |      {
+        |      "name": "age",
+        |      "type": "int"
+        |      },
+        |      {
+        |      "name": "address",
+        |      "type": {
+        |      "type": "array",
+        |      "items": {
+        |      "name": "street",
+        |      "type": "bytes",
+        |      "logicalType": "decimal",
+        |      "precision": 4,
+        |      "scale": 1
+        |      }}}]
+        |  }
+      """.stripMargin
+
+    val json1: String = """ {"name": "bob","age": 10,"address": ["32", "42"]} 
"""
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val record = testUtil.jsonToAvro(json1, schema1)
+
+    val writer = CarbonWriter.builder
+      
.outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
+    writer.write(record)
+    writer.close()
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(id struct<b:decimal(4,3)>) 
STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+    checkExistence(sql("select * from sdkOutputTable"), true, "32.0")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b588cb65/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 64dfd42..7c1d9a2 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -18,6 +18,8 @@
 package org.apache.carbondata.sdk.file;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -29,6 +31,7 @@ import java.util.UUID;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -46,6 +49,7 @@ import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.JobID;
@@ -103,42 +107,6 @@ public class AvroCarbonWriter extends CarbonWriter {
     Schema.Type type = avroField.schema().getType();
     LogicalType logicalType = avroField.schema().getLogicalType();
     switch (type) {
-      case INT:
-        if (logicalType != null) {
-          if (logicalType instanceof LogicalTypes.Date) {
-            int dateIntValue = (int) fieldValue;
-            out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY;
-          } else {
-            LOGGER.warn("Actual type: INT, Logical Type: " + 
logicalType.getName());
-            out = fieldValue;
-          }
-        } else {
-          out = fieldValue;
-        }
-        break;
-      case BOOLEAN:
-      case LONG:
-        if (logicalType != null && !(logicalType instanceof 
LogicalTypes.TimestampMillis)) {
-          if (logicalType instanceof LogicalTypes.TimestampMicros) {
-            long dateIntValue = (long) fieldValue;
-            out = dateIntValue / 1000L;
-          } else {
-            LOGGER.warn("Actual type: INT, Logical Type: " + 
logicalType.getName());
-            out = fieldValue;
-          }
-        } else {
-          out = fieldValue;
-        }
-        break;
-      case DOUBLE:
-      case STRING:
-        out = fieldValue;
-        break;
-      case FLOAT:
-        // direct conversion will change precision. So parse from string.
-        // also carbon internally needs float as double
-        out = Double.parseDouble(fieldValue.toString());
-        break;
       case MAP:
         // Note: Avro object takes care of removing the duplicates so we 
should not handle it again
         // Map will be internally stored as Array<Struct<Key,Value>>
@@ -213,6 +181,124 @@ public class AvroCarbonWriter extends CarbonWriter {
         }
         out = new ArrayObject(arrayChildObjects);
         break;
+      case UNION:
+        // Union type will be internally stored as Struct<col:type>
+        // Fill data object only if fieldvalue is instance of datatype
+        // For other field datatypes, fill value as Null
+        List<Schema> unionFields = avroField.schema().getTypes();
+        int notNullUnionFieldsCount = 0;
+        for (Schema unionField : unionFields) {
+          if (!unionField.getType().equals(Schema.Type.NULL)) {
+            notNullUnionFieldsCount++;
+          }
+        }
+        Object[] values = new Object[notNullUnionFieldsCount];
+        int j = 0;
+        for (Schema unionField : unionFields) {
+          if (unionField.getType().equals(Schema.Type.NULL)) {
+            continue;
+          }
+          if (checkFieldValueType(unionField.getType(), fieldValue)) {
+            values[j] = avroFieldToObjectForUnionType(unionField, fieldValue, 
avroField);
+            break;
+          }
+          j++;
+        }
+        out = new StructObject(values);
+        break;
+      default:
+        out = avroPrimitiveFieldToObject(type, logicalType, fieldValue);
+    }
+    return out;
+  }
+
+  /**
+   * For Union type, fill data if Schema.Type is instance of fieldValue
+   * and return result
+   *
+   * @param type
+   * @param fieldValue
+   * @return
+   */
+  private boolean checkFieldValueType(Schema.Type type, Object fieldValue) {
+    switch (type) {
+      case INT:
+        return (fieldValue instanceof Integer);
+      case BOOLEAN:
+        return (fieldValue instanceof Boolean);
+      case LONG:
+        return (fieldValue instanceof Long);
+      case DOUBLE:
+        return (fieldValue instanceof Double);
+      case STRING:
+        return (fieldValue instanceof Utf8 || fieldValue instanceof String);
+      case FLOAT:
+        return (fieldValue instanceof Float);
+      case RECORD:
+        return (fieldValue instanceof GenericData.Record);
+      case ARRAY:
+        return (fieldValue instanceof GenericData.Array || fieldValue 
instanceof ArrayList);
+      case BYTES:
+        return (fieldValue instanceof ByteBuffer);
+      case MAP:
+        return (fieldValue instanceof HashMap);
+      case ENUM:
+        return (fieldValue instanceof GenericData.EnumSymbol);
+      default:
+        return false;
+    }
+  }
+
+  private Object avroPrimitiveFieldToObject(Schema.Type type, LogicalType 
logicalType,
+      Object fieldValue) {
+    Object out = null;
+    switch (type) {
+      case INT:
+        if (logicalType != null) {
+          if (logicalType instanceof LogicalTypes.Date) {
+            int dateIntValue = (int) fieldValue;
+            out = dateIntValue * DateDirectDictionaryGenerator.MILLIS_PER_DAY;
+          } else {
+            LOGGER.warn("Actual type: INT, Logical Type: " + 
logicalType.getName());
+            out = fieldValue;
+          }
+        } else {
+          out = fieldValue;
+        }
+        break;
+      case BOOLEAN:
+      case LONG:
+        if (logicalType != null && !(logicalType instanceof 
LogicalTypes.TimestampMillis)) {
+          if (logicalType instanceof LogicalTypes.TimestampMicros) {
+            long dateIntValue = (long) fieldValue;
+            out = dateIntValue / 1000L;
+          } else {
+            LOGGER.warn("Actual type: INT, Logical Type: " + 
logicalType.getName());
+            out = fieldValue;
+          }
+        } else {
+          out = fieldValue;
+        }
+        break;
+      case DOUBLE:
+      case STRING:
+      case ENUM:
+        out = fieldValue;
+        break;
+      case FLOAT:
+        // direct conversion will change precision. So parse from string.
+        // also carbon internally needs float as double
+        out = Double.parseDouble(fieldValue.toString());
+        break;
+      case BYTES:
+        // DECIMAL type is defined in Avro as a BYTE type with the logicalType 
property
+        // set to "decimal" and a specified precision and scale
+        // As binary type is not supported yet,value will be null
+        if (logicalType instanceof LogicalTypes.Decimal) {
+          out = new BigDecimal(new String(((ByteBuffer) fieldValue).array(),
+              CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
+        }
+        break;
       case NULL:
         out = null;
         break;
@@ -224,6 +310,110 @@ public class AvroCarbonWriter extends CarbonWriter {
   }
 
   /**
+   * fill fieldvalue for union type
+   *
+   * @param avroField
+   * @param fieldValue
+   * @param avroFields
+   * @return
+   */
+  private Object avroFieldToObjectForUnionType(Schema avroField, Object 
fieldValue,
+      Schema.Field avroFields) {
+    Object out;
+    Schema.Type type = avroField.getType();
+    LogicalType logicalType = avroField.getLogicalType();
+    switch (type) {
+      case RECORD:
+        if (fieldValue instanceof GenericData.Record) {
+          List<Schema.Field> fields = avroField.getFields();
+
+          Object[] structChildObjects = new Object[fields.size()];
+          for (int i = 0; i < fields.size(); i++) {
+            Object childObject =
+                avroFieldToObject(fields.get(i), ((GenericData.Record) 
fieldValue).get(i));
+            if (childObject != null) {
+              structChildObjects[i] = childObject;
+            }
+          }
+          out = new StructObject(structChildObjects);
+        } else {
+          out = null;
+        }
+        break;
+      case ARRAY:
+        if (fieldValue instanceof GenericData.Array || fieldValue instanceof 
ArrayList) {
+          Object[] arrayChildObjects;
+          if (fieldValue instanceof GenericData.Array) {
+            int size = ((GenericData.Array) fieldValue).size();
+            arrayChildObjects = new Object[size];
+            for (int i = 0; i < size; i++) {
+              Object childObject = avroFieldToObject(
+                  new Schema.Field(avroFields.name(), 
avroField.getElementType(), avroFields.doc(),
+                      avroFields.defaultVal()), ((GenericData.Array) 
fieldValue).get(i));
+              if (childObject != null) {
+                arrayChildObjects[i] = childObject;
+              }
+            }
+          } else {
+            int size = ((ArrayList) fieldValue).size();
+            arrayChildObjects = new Object[size];
+            for (int i = 0; i < size; i++) {
+              Object childObject = avroFieldToObject(
+                  new Schema.Field(avroFields.name(), 
avroField.getElementType(), avroFields.doc(),
+                      avroFields.defaultVal()), ((ArrayList) 
fieldValue).get(i));
+              if (childObject != null) {
+                arrayChildObjects[i] = childObject;
+              }
+            }
+          }
+          out = new ArrayObject(arrayChildObjects);
+        } else {
+          out = null;
+        }
+        break;
+      case MAP:
+        // Note: Avro object takes care of removing the duplicates so we 
should not handle it again
+        // Map will be internally stored as Array<Struct<Key,Value>>
+        if (fieldValue instanceof HashMap) {
+          Map mapEntries = (HashMap) fieldValue;
+          Object[] arrayMapChildObjects = new Object[mapEntries.size()];
+          if (!mapEntries.isEmpty()) {
+            Iterator iterator = mapEntries.entrySet().iterator();
+            int counter = 0;
+            while (iterator.hasNext()) {
+              // size is 2 because map will have key and value
+              Object[] mapChildObjects = new Object[2];
+              Map.Entry mapEntry = (HashMap.Entry) iterator.next();
+              // evaluate key
+              Object keyObject = avroFieldToObject(
+                  new Schema.Field(avroFields.name(), 
Schema.create(Schema.Type.STRING),
+                      avroFields.doc(), avroFields.defaultVal()), 
mapEntry.getKey());
+              // evaluate value
+              Object valueObject = avroFieldToObject(
+                  new Schema.Field(avroFields.name(), 
avroField.getValueType(), avroFields.doc(),
+                      avroFields.defaultVal()), mapEntry.getValue());
+              if (keyObject != null) {
+                mapChildObjects[0] = keyObject;
+              }
+              if (valueObject != null) {
+                mapChildObjects[1] = valueObject;
+              }
+              StructObject keyValueObject = new StructObject(mapChildObjects);
+              arrayMapChildObjects[counter++] = keyValueObject;
+            }
+          }
+          out = new ArrayObject(arrayMapChildObjects);
+        } else {
+          out = null;
+        }
+        break;
+      default:
+        out = avroPrimitiveFieldToObject(type, logicalType, fieldValue);
+    }
+    return out;
+  }
+
+  /**
    * converts avro schema to carbon schema required by carbonWriter
    *
    * @param avroSchema avro schema
@@ -270,6 +460,7 @@ public class AvroCarbonWriter extends CarbonWriter {
         }
       case DOUBLE:
         return new Field(fieldName, DataTypes.DOUBLE);
+      case ENUM:
       case STRING:
         return new Field(fieldName, DataTypes.STRING);
       case FLOAT:
@@ -310,6 +501,32 @@ public class AvroCarbonWriter extends CarbonWriter {
         } else {
           return null;
         }
+      case UNION:
+        int i = 0;
+        // Get union types and store as Struct<type>
+        ArrayList<StructField> unionFields = new ArrayList<>();
+        for (Schema avroSubField : avroField.schema().getTypes()) {
+          if (!avroSubField.getType().equals(Schema.Type.NULL)) {
+            StructField unionField = prepareSubFields(avroField.name() + i++, 
avroSubField);
+            if (unionField != null) {
+              unionFields.add(unionField);
+            }
+          }
+        }
+        if (unionFields.isEmpty()) {
+          throw new UnsupportedOperationException(
+              "Carbon do not support Avro UNION with only null type");
+        }
+        return new Field(fieldName, "struct", unionFields);
+      case BYTES:
+        // DECIMAL type is defined in Avro as a BYTE type with the logicalType 
property
+        // set to "decimal" and a specified precision and scale
+        if (logicalType instanceof LogicalTypes.Decimal) {
+          int precision = ((LogicalTypes.Decimal) 
childSchema.getLogicalType()).getPrecision();
+          int scale = ((LogicalTypes.Decimal) 
childSchema.getLogicalType()).getScale();
+          return new Field(fieldName, DataTypes.createDecimalType(precision, 
scale));
+        }
+        return null;
       case NULL:
         return null;
       default:
@@ -343,6 +560,7 @@ public class AvroCarbonWriter extends CarbonWriter {
         }
       case DOUBLE:
         return new StructField(fieldName, DataTypes.DOUBLE);
+      case ENUM:
       case STRING:
         return new StructField(fieldName, DataTypes.STRING);
       case FLOAT:
@@ -385,6 +603,26 @@ public class AvroCarbonWriter extends CarbonWriter {
         } else {
           return null;
         }
+      case UNION:
+        // recursively get the union types
+        int i = 0;
+        ArrayList<StructField> structSubTypes = new ArrayList<>();
+        for (Schema avroSubField : childSchema.getTypes()) {
+          StructField structField = prepareSubFields(fieldName + i++, 
avroSubField);
+          if (structField != null) {
+            structSubTypes.add(structField);
+          }
+        }
+        return (new StructField(fieldName, 
DataTypes.createStructType(structSubTypes)));
+      case BYTES:
+        // DECIMAL type is defined in Avro as a BYTE type with the logicalType 
property
+        // set to "decimal" and a specified precision and scale
+        if (logicalType instanceof LogicalTypes.Decimal) {
+          int precision = ((LogicalTypes.Decimal) 
childSchema.getLogicalType()).getPrecision();
+          int scale = ((LogicalTypes.Decimal) 
childSchema.getLogicalType()).getScale();
+          return new StructField(fieldName, 
DataTypes.createDecimalType(precision, scale));
+        }
+        return null;
       case NULL:
         return null;
       default:
@@ -425,6 +663,7 @@ public class AvroCarbonWriter extends CarbonWriter {
         }
       case DOUBLE:
         return DataTypes.DOUBLE;
+      case ENUM:
       case STRING:
         return DataTypes.STRING;
       case FLOAT:
@@ -457,6 +696,26 @@ public class AvroCarbonWriter extends CarbonWriter {
         } else {
           return null;
         }
+      case UNION:
+        int i = 0;
+        // recursively get the union types and create struct type
+        ArrayList<StructField> unionFields = new ArrayList<>();
+        for (Schema avroSubField : childSchema.getTypes()) {
+          StructField unionField = prepareSubFields(avroSubField.getName() + 
i++, avroSubField);
+          if (unionField != null) {
+            unionFields.add(unionField);
+          }
+        }
+        return DataTypes.createStructType(unionFields);
+      case BYTES:
+        // DECIMAL type is defined in Avro as a BYTE type with the logicalType 
property
+        // set to "decimal" and a specified precision and scale
+        if (logicalType instanceof LogicalTypes.Decimal) {
+          int precision = ((LogicalTypes.Decimal) 
childSchema.getLogicalType()).getPrecision();
+          int scale = ((LogicalTypes.Decimal) 
childSchema.getLogicalType()).getScale();
+          return DataTypes.createDecimalType(precision, scale);
+        }
+        return null;
       case NULL:
         return null;
       default:

Reply via email to