This is an automated email from the ASF dual-hosted git repository.

shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 313faa5  [GOBBLIN-571] Fix parquet schema for complex types
313faa5 is described below

commit 313faa57617cc8241750990f423d407ed59f3b80
Author: tilakpatidar <[email protected]>
AuthorDate: Wed Dec 18 11:52:13 2019 -0800

    [GOBBLIN-571] Fix parquet schema for complex types
    
    Closes #2436 from tilakpatidar/parquet_schema_fix
---
 .../parquet/JsonElementConversionFactory.java      | 10 ++++----
 .../gobblin/converter/parquet/JsonSchema.java      |  5 +++-
 ...sonIntermediateToParquetGroupConverterTest.java | 21 +++++++++++++---
 .../JsonIntermediateToParquetConverter.json        | 28 +++++++++++++++++++---
 4 files changed, 52 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
index 46b264a..288a5de 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
+++ 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
@@ -330,7 +330,7 @@ public class JsonElementConversionFactory {
 
     @Override
     JsonSchema getElementSchema() {
-      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType);
+      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType, 
true);
       jsonSchema.setColumnName(ARRAY_KEY);
       return jsonSchema;
     }
@@ -347,7 +347,7 @@ public class JsonElementConversionFactory {
 
     @Override
     Object convertField(JsonElement value) {
-      if (symbols.contains(value.getAsString()) || 
this.jsonSchema.isNullable()) {
+      if (symbols.contains(value.getAsString()) || 
(this.jsonSchema.isNullable() && value.isJsonNull())) {
         return this.elementConverter.convert(value);
       }
       throw new RuntimeException("Symbol " + value.getAsString() + " does not 
belong to set " + symbols.toString());
@@ -360,7 +360,7 @@ public class JsonElementConversionFactory {
 
     @Override
     JsonSchema getElementSchema() {
-      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING);
+      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING, 
this.jsonSchema.isNullable());
       jsonSchema.setColumnName(this.jsonSchema.getColumnName());
       return jsonSchema;
     }
@@ -475,13 +475,13 @@ public class JsonElementConversionFactory {
 
     @Override
     JsonSchema getElementSchema() {
-      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType);
+      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType, 
false);
       jsonSchema.setColumnName(MAP_VALUE_COLUMN_NAME);
       return jsonSchema;
     }
 
     public JsonElementConverter getKeyConverter() {
-      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING);
+      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING, false);
       jsonSchema.setColumnName(MAP_KEY_COLUMN_NAME);
       return getConverter(jsonSchema, false);
     }
diff --git 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
index dc650a7..626551d 100644
--- 
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
+++ 
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.source.extractor.schema.Schema;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 
 import parquet.schema.Type.Repetition;
 
@@ -108,14 +109,16 @@ public class JsonSchema extends Schema {
   /**
    * Builds a {@link JsonSchema} object for a given {@link InputType} object.
    * @param type
+   * @param isNullable
    * @return
    */
-  public static JsonSchema buildBaseSchema(InputType type) {
+  public static JsonSchema buildBaseSchema(InputType type, boolean isNullable) 
{
     JsonObject jsonObject = new JsonObject();
     JsonObject dataType = new JsonObject();
     jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
     dataType.addProperty(TYPE_KEY, type.toString());
     jsonObject.add(DATA_TYPE_KEY, dataType);
+    jsonObject.add(IS_NULLABLE_KEY, new JsonPrimitive(isNullable));
     return new JsonSchema(jsonObject);
   }
 
diff --git 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
 
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
index c92834b..da9a7ce 100644
--- 
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
+++ 
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
@@ -66,7 +66,6 @@ public class JsonIntermediateToParquetGroupConverterTest {
     MessageType schema = 
parquetConverter.convertSchema(test.get("schema").getAsJsonArray(), workUnit);
     Group record =
         parquetConverter.convertRecord(schema, 
test.get("record").getAsJsonObject(), workUnit).iterator().next();
-
     assertEqualsIgnoreSpaces(schema.toString(), 
test.get("expectedSchema").getAsString());
     assertEqualsIgnoreSpaces(record.toString(), 
test.get("expectedRecord").getAsString());
   }
@@ -74,7 +73,7 @@ public class JsonIntermediateToParquetGroupConverterTest {
   @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = "Symbol .* does not belong to set \\[.*?\\]")
   public void testEnumTypeBelongsToEnumSet()
       throws Exception {
-    JsonObject test = testCases.get("enum").getAsJsonObject();
+    JsonObject test = deepCopy(testCases.get("enum").getAsJsonObject(), 
JsonObject.class);
     parquetConverter = new JsonIntermediateToParquetGroupConverter();
 
     MessageType schema = 
parquetConverter.convertSchema(test.get("schema").getAsJsonArray(), workUnit);
@@ -97,12 +96,18 @@ public class JsonIntermediateToParquetGroupConverterTest {
   }
 
   @Test
-  public void testEnumType()
+  public void testEnumTypeWithNullableTrue()
       throws Exception {
     testCase("enum");
   }
 
   @Test
+  public void testEnumTypeWithNullableFalse()
+      throws Exception {
+    testCase("enum1");
+  }
+
+  @Test
   public void testRecordType()
       throws Exception {
     testCase("record");
@@ -125,4 +130,14 @@ public class JsonIntermediateToParquetGroupConverterTest {
     assertEquals(actual.replaceAll("\\n", ";").replaceAll("\\s|\\t", ""),
         expected.replaceAll("\\n", ";").replaceAll("\\s|\\t", ""));
   }
+
+  public <T> T deepCopy(T object, Class<T> type) {
+    try {
+      Gson gson = new Gson();
+      return gson.fromJson(gson.toJson(object, type), type);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
 }
diff --git 
a/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
 
b/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
index a588ea9..bbd7344 100644
--- 
a/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
+++ 
b/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
@@ -94,7 +94,8 @@
         "dataType": {
           "type": "array",
           "items": "int"
-        }
+        },
+        "isNullable": true
       },
       {
         "columnName": "somearray1",
@@ -133,7 +134,7 @@
       }
     ],
     "expectedRecord": "somearray ;  item:1 ;  item:2 ;  item:3 ; somearray1 ;  
item:1 ;  item:2 ;  item:3 ; somearray2 ;  item:1.0 ;  item:2.0 ;  item:3.0 ; 
somearray3 ;  item:1.0 ;  item:2.0 ;  item:3.0 ; somearray4 ;  item:true ;  
item:false ;  item:true ; somearray5 ;  item:hello ;  item:world ; ",
-    "expectedSchema": "message test_table {  ;  required group somearray {  ; 
repeated int32 item ;  ; } ;  required groupsomearray1 {  ; repeated int64 item 
;  ; } ;  required groupsomearray2 {  ; repeated float item ;  ; } ;  required 
groupsomearray3 {  ; repeated double item ;  ; } ;  required groupsomearray4 {  
; repeated boolean item ;  ; } ;  required groupsomearray5 {  ; repeated binary 
item(UTF8) ;  ; } ; } ; "
+    "expectedSchema": "message test_table {  ;  optional group somearray {  ; 
repeated int32 item ;  ; } ;  required groupsomearray1 {  ; repeated int64 item 
;  ; } ;  required groupsomearray2 {  ; repeated float item ;  ; } ;  required 
groupsomearray3 {  ; repeated double item ;  ; } ;  required groupsomearray4 {  
; repeated boolean item ;  ; } ;  required groupsomearray5 {  ; repeated binary 
item(UTF8) ;  ; } ; } ; "
   },
   "enum": {
     "record": {
@@ -148,7 +149,28 @@
             "HELLO",
             "WORLD"
           ]
-        }
+        },
+        "isNullable": true
+      }
+    ],
+    "expectedRecord": "some_enum : HELLO ;",
+    "expectedSchema": "message test_table { ; optional binary some_enum (UTF8) 
;; } ;"
+  },
+  "enum1": {
+    "record": {
+      "some_enum": "HELLO"
+    },
+    "schema": [
+      {
+        "columnName": "some_enum",
+        "dataType": {
+          "type": "enum",
+          "symbols": [
+            "HELLO",
+            "WORLD"
+          ]
+        },
+        "isNullable": false
       }
     ],
     "expectedRecord": "some_enum : HELLO ;",

Reply via email to