[CARBONDATA-2443][SDK] Multi level complex type support for AVRO based SDK

Problem:
Problem inferring the complex type schema with boolean array type from the 
store created using SDK writer

Analysis:
When we create an external table and infer the schema from store created using 
SDK writer, the operation fails because of complex type field with boolean 
array dataType. This is because during schema creation by SDK writer, for array 
type children a child with column name val is added.
While parsing the logic to append the parent name with child column name is 
missing for boolean type which is causing this problem.

Solution:
Handle the parsing for boolean type

This closes #2294


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/35a7b5e9
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/35a7b5e9
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/35a7b5e9

Branch: refs/heads/spark-2.3
Commit: 35a7b5e9af5cabe85794274e07cab9a6bbbbc53f
Parents: ff5166e
Author: manishgupta88 <tomanishgupt...@gmail.com>
Authored: Thu May 10 17:09:17 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu May 10 20:49:28 2018 +0530

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        | 33 ++++++++------------
 .../schema/table/TableSchemaBuilderSuite.java   | 13 +++++---
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  5 +++
 .../sdk/file/CarbonWriterBuilder.java           | 13 ++++++--
 4 files changed, 36 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/35a7b5e9/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index ca082e1..b078400 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
@@ -114,12 +115,12 @@ public class TableSchemaBuilder {
     this.sortColumns = sortColumns;
   }
 
-  public ColumnSchema addColumn(StructField field, boolean isSortColumn) {
-    return addColumn(field, null, isSortColumn, false);
+  public ColumnSchema addColumn(StructField field, AtomicInteger valIndex, 
boolean isSortColumn) {
+    return addColumn(field, null, valIndex, isSortColumn, false);
   }
 
-  private ColumnSchema addColumn(StructField field, String parentName, boolean 
isSortColumn,
-      boolean isComplexChild) {
+  private ColumnSchema addColumn(StructField field, String parentName, 
AtomicInteger valIndex,
+      boolean isSortColumn, boolean isComplexChild) {
     Objects.requireNonNull(field);
     checkRepeatColumnName(field);
     ColumnSchema newColumn = new ColumnSchema();
@@ -184,33 +185,25 @@ public class TableSchemaBuilder {
     if (field.getDataType().isComplexType()) {
       String parentFieldName = newColumn.getColumnName();
       if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
-        String colName = getColNameForArray(parentFieldName);
-        addColumn(new StructField(colName,
-            ((ArrayType) field.getDataType()).getElementType()), 
field.getFieldName(), false, true);
+        String colName = getColNameForArray(valIndex);
+        addColumn(new StructField(colName, ((ArrayType) 
field.getDataType()).getElementType()),
+            field.getFieldName(), valIndex, false, true);
       } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")
           && ((StructType) field.getDataType()).getFields().size() > 0) {
         // This field has children.
         List<StructField> fields = ((StructType) 
field.getDataType()).getFields();
         for (int i = 0; i < fields.size(); i++) {
-          addColumn(fields.get(i), parentFieldName, false, true);
+          addColumn(fields.get(i), parentFieldName, valIndex, false, true);
         }
       }
     }
     return newColumn;
   }
 
-  private String getColNameForArray(String parentFieldName) {
-    if (!parentFieldName.endsWith(".val")) {
-      return "val";
-    } else {
-      String[] splits = parentFieldName.split("val");
-      if (splits.length == 1) {
-        return "val" + 1;
-      } else {
-        return "val" + (Integer.parseInt(parentFieldName
-            .substring(parentFieldName.lastIndexOf("val") + 3, 
parentFieldName.length())) + 1);
-      }
-    }
+  private String getColNameForArray(AtomicInteger valIndex) {
+    String colName = "val" + valIndex.get();
+    valIndex.incrementAndGet();
+    return colName;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/35a7b5e9/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
index e9dce94..48e5d1b 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilderSuite.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.metadata.schema.table;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
@@ -32,15 +33,16 @@ public class TableSchemaBuilderSuite {
   @Test(expected = NullPointerException.class)
   public void testNullField() {
     TableSchemaBuilder builder = TableSchema.builder();
-    builder.addColumn(null, true);
+    builder.addColumn(null, new AtomicInteger(0), true);
   }
 
   @Test
   public void testBuilder() {
     TableSchemaBuilder builder = TableSchema.builder();
-    ColumnSchema columnSchema = builder.addColumn(new StructField("a", 
DataTypes.INT), true);
+    ColumnSchema columnSchema =
+        builder.addColumn(new StructField("a", DataTypes.INT), new 
AtomicInteger(0), true);
     builder.setSortColumns(Arrays.asList(columnSchema));
-    builder.addColumn(new StructField("b", DataTypes.DOUBLE), false);
+    builder.addColumn(new StructField("b", DataTypes.DOUBLE), new 
AtomicInteger(0), false);
     TableSchema schema = builder.build();
     Assert.assertEquals(2, schema.getListOfColumns().size());
     List<ColumnSchema> columns = schema.getListOfColumns();
@@ -51,9 +53,10 @@ public class TableSchemaBuilderSuite {
   @Test(expected = IllegalArgumentException.class)
   public void testRepeatedColumn() {
     TableSchemaBuilder builder = TableSchema.builder();
-    ColumnSchema columnSchema = builder.addColumn(new StructField("a", 
DataTypes.INT), true);
+    ColumnSchema columnSchema =
+        builder.addColumn(new StructField("a", DataTypes.INT), new 
AtomicInteger(0), true);
     builder.setSortColumns(Arrays.asList(columnSchema));
-    builder.addColumn(new StructField("a", DataTypes.DOUBLE), false);
+    builder.addColumn(new StructField("a", DataTypes.DOUBLE), new 
AtomicInteger(0), false);
     TableSchema schema = builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/35a7b5e9/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 890f8fc..9bc5597 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1147,6 +1147,9 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       case "decimal" => Field(field.column, Some("Decimal"), field.name, 
Some(null), field.parent,
         field.storeType, field.schemaOrdinal, field.precision, field.scale, 
field.rawSchema,
         field.columnComment)
+      case "boolean" => Field(field.column, Some("Boolean"), field.name, 
Some(null), field.parent,
+        field.storeType, field.schemaOrdinal, field.precision, field.scale, 
field.rawSchema,
+        field.columnComment)
       // checking if the nested data type contains the child type as 
decimal(10,0),
       // if it is present then extracting the precision and scale. resetting 
the data type
       // with Decimal.
@@ -1214,6 +1217,8 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       case "Decimal" => Field(parentName + "." + field.column, Some("Decimal"),
         Some(parentName + "." + field.name.getOrElse(None)), Some(null), 
parentName,
         field.storeType, field.schemaOrdinal, field.precision, field.scale)
+      case "Boolean" => Field(parentName + "." + field.column, Some("Boolean"),
+        Some(parentName + "." + field.name.getOrElse(None)), Some(null), 
parentName)
       case _ => field
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/35a7b5e9/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 76a46d0..f541dbb 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -426,6 +427,10 @@ public class CarbonWriterBuilder {
 
   private void buildTableSchema(Field[] fields, TableSchemaBuilder 
tableSchemaBuilder,
       List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) {
+    // a counter which will be used in case of complex array type. This 
valIndex will be assigned
+    // to child of complex array type in the order val1, val2 so that each 
array type child is
+    // differentiated to any level
+    AtomicInteger valIndex = new AtomicInteger(0);
     for (Field field : fields) {
       if (null != field) {
         int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
@@ -443,7 +448,8 @@ public class CarbonWriterBuilder {
             // Loop through the inner columns and for a StructData
             DataType complexType =
                 
DataTypes.createArrayType(field.getChildren().get(0).getDataType());
-            tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), 
complexType), false);
+            tableSchemaBuilder
+                .addColumn(new StructField(field.getFieldName(), complexType), 
valIndex, false);
           } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) 
{
             // Loop through the inner columns and for a StructData
             List<StructField> structFieldsArray =
@@ -453,12 +459,13 @@ public class CarbonWriterBuilder {
                   .add(new StructField(childFld.getFieldName(), 
childFld.getDataType()));
             }
             DataType complexType = 
DataTypes.createStructType(structFieldsArray);
-            tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), 
complexType), false);
+            tableSchemaBuilder
+                .addColumn(new StructField(field.getFieldName(), complexType), 
valIndex, false);
           }
         } else {
           ColumnSchema columnSchema = tableSchemaBuilder
               .addColumn(new StructField(field.getFieldName(), 
field.getDataType()),
-                  isSortColumn > -1);
+                  valIndex, isSortColumn > -1);
           columnSchema.setSortColumn(true);
           if (isSortColumn > -1) {
             sortColumnsSchemaList[isSortColumn] = columnSchema;

Reply via email to