This is an automated email from the ASF dual-hosted git repository.
shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 313faa5 [GOBBLIN-571] Fix parquet schema for complex types
313faa5 is described below
commit 313faa57617cc8241750990f423d407ed59f3b80
Author: tilakpatidar <[email protected]>
AuthorDate: Wed Dec 18 11:52:13 2019 -0800
[GOBBLIN-571] Fix parquet schema for complex types
Closes #2436 from tilakpatidar/parquet_schema_fix
---
.../parquet/JsonElementConversionFactory.java | 10 ++++----
.../gobblin/converter/parquet/JsonSchema.java | 5 +++-
...sonIntermediateToParquetGroupConverterTest.java | 21 +++++++++++++---
.../JsonIntermediateToParquetConverter.json | 28 +++++++++++++++++++---
4 files changed, 52 insertions(+), 12 deletions(-)
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
index 46b264a..288a5de 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
+++
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java
@@ -330,7 +330,7 @@ public class JsonElementConversionFactory {
@Override
JsonSchema getElementSchema() {
- JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType);
+ JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType,
true);
jsonSchema.setColumnName(ARRAY_KEY);
return jsonSchema;
}
@@ -347,7 +347,7 @@ public class JsonElementConversionFactory {
@Override
Object convertField(JsonElement value) {
- if (symbols.contains(value.getAsString()) ||
this.jsonSchema.isNullable()) {
+ if (symbols.contains(value.getAsString()) ||
(this.jsonSchema.isNullable() && value.isJsonNull())) {
return this.elementConverter.convert(value);
}
throw new RuntimeException("Symbol " + value.getAsString() + " does not
belong to set " + symbols.toString());
@@ -360,7 +360,7 @@ public class JsonElementConversionFactory {
@Override
JsonSchema getElementSchema() {
- JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING);
+ JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING,
this.jsonSchema.isNullable());
jsonSchema.setColumnName(this.jsonSchema.getColumnName());
return jsonSchema;
}
@@ -475,13 +475,13 @@ public class JsonElementConversionFactory {
@Override
JsonSchema getElementSchema() {
- JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType);
+ JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType,
false);
jsonSchema.setColumnName(MAP_VALUE_COLUMN_NAME);
return jsonSchema;
}
public JsonElementConverter getKeyConverter() {
- JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING);
+ JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING, false);
jsonSchema.setColumnName(MAP_KEY_COLUMN_NAME);
return getConverter(jsonSchema, false);
}
diff --git
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
index dc650a7..626551d 100644
---
a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
+++
b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java
@@ -21,6 +21,7 @@ import org.apache.gobblin.source.extractor.schema.Schema;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
import parquet.schema.Type.Repetition;
@@ -108,14 +109,16 @@ public class JsonSchema extends Schema {
/**
* Builds a {@link JsonSchema} object for a given {@link InputType} object.
* @param type
+ * @param isNullable
* @return
*/
- public static JsonSchema buildBaseSchema(InputType type) {
+ public static JsonSchema buildBaseSchema(InputType type, boolean isNullable)
{
JsonObject jsonObject = new JsonObject();
JsonObject dataType = new JsonObject();
jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME);
dataType.addProperty(TYPE_KEY, type.toString());
jsonObject.add(DATA_TYPE_KEY, dataType);
+ jsonObject.add(IS_NULLABLE_KEY, new JsonPrimitive(isNullable));
return new JsonSchema(jsonObject);
}
diff --git
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
index c92834b..da9a7ce 100644
---
a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
+++
b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java
@@ -66,7 +66,6 @@ public class JsonIntermediateToParquetGroupConverterTest {
MessageType schema =
parquetConverter.convertSchema(test.get("schema").getAsJsonArray(), workUnit);
Group record =
parquetConverter.convertRecord(schema,
test.get("record").getAsJsonObject(), workUnit).iterator().next();
-
assertEqualsIgnoreSpaces(schema.toString(),
test.get("expectedSchema").getAsString());
assertEqualsIgnoreSpaces(record.toString(),
test.get("expectedRecord").getAsString());
}
@@ -74,7 +73,7 @@ public class JsonIntermediateToParquetGroupConverterTest {
@Test(expectedExceptions = RuntimeException.class,
expectedExceptionsMessageRegExp = "Symbol .* does not belong to set \\[.*?\\]")
public void testEnumTypeBelongsToEnumSet()
throws Exception {
- JsonObject test = testCases.get("enum").getAsJsonObject();
+ JsonObject test = deepCopy(testCases.get("enum").getAsJsonObject(),
JsonObject.class);
parquetConverter = new JsonIntermediateToParquetGroupConverter();
MessageType schema =
parquetConverter.convertSchema(test.get("schema").getAsJsonArray(), workUnit);
@@ -97,12 +96,18 @@ public class JsonIntermediateToParquetGroupConverterTest {
}
@Test
- public void testEnumType()
+ public void testEnumTypeWithNullableTrue()
throws Exception {
testCase("enum");
}
@Test
+ public void testEnumTypeWithNullableFalse()
+ throws Exception {
+ testCase("enum1");
+ }
+
+ @Test
public void testRecordType()
throws Exception {
testCase("record");
@@ -125,4 +130,14 @@ public class JsonIntermediateToParquetGroupConverterTest {
assertEquals(actual.replaceAll("\\n", ";").replaceAll("\\s|\\t", ""),
expected.replaceAll("\\n", ";").replaceAll("\\s|\\t", ""));
}
+
+ public <T> T deepCopy(T object, Class<T> type) {
+ try {
+ Gson gson = new Gson();
+ return gson.fromJson(gson.toJson(object, type), type);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
}
diff --git
a/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
b/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
index a588ea9..bbd7344 100644
---
a/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
+++
b/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json
@@ -94,7 +94,8 @@
"dataType": {
"type": "array",
"items": "int"
- }
+ },
+ "isNullable": true
},
{
"columnName": "somearray1",
@@ -133,7 +134,7 @@
}
],
"expectedRecord": "somearray ; item:1 ; item:2 ; item:3 ; somearray1 ;
item:1 ; item:2 ; item:3 ; somearray2 ; item:1.0 ; item:2.0 ; item:3.0 ;
somearray3 ; item:1.0 ; item:2.0 ; item:3.0 ; somearray4 ; item:true ;
item:false ; item:true ; somearray5 ; item:hello ; item:world ; ",
- "expectedSchema": "message test_table { ; required group somearray { ;
repeated int32 item ; ; } ; required groupsomearray1 { ; repeated int64 item
; ; } ; required groupsomearray2 { ; repeated float item ; ; } ; required
groupsomearray3 { ; repeated double item ; ; } ; required groupsomearray4 {
; repeated boolean item ; ; } ; required groupsomearray5 { ; repeated binary
item(UTF8) ; ; } ; } ; "
+ "expectedSchema": "message test_table { ; optional group somearray { ;
repeated int32 item ; ; } ; required groupsomearray1 { ; repeated int64 item
; ; } ; required groupsomearray2 { ; repeated float item ; ; } ; required
groupsomearray3 { ; repeated double item ; ; } ; required groupsomearray4 {
; repeated boolean item ; ; } ; required groupsomearray5 { ; repeated binary
item(UTF8) ; ; } ; } ; "
},
"enum": {
"record": {
@@ -148,7 +149,28 @@
"HELLO",
"WORLD"
]
- }
+ },
+ "isNullable": true
+ }
+ ],
+ "expectedRecord": "some_enum : HELLO ;",
+ "expectedSchema": "message test_table { ; optional binary some_enum (UTF8)
;; } ;"
+ },
+ "enum1": {
+ "record": {
+ "some_enum": "HELLO"
+ },
+ "schema": [
+ {
+ "columnName": "some_enum",
+ "dataType": {
+ "type": "enum",
+ "symbols": [
+ "HELLO",
+ "WORLD"
+ ]
+ },
+ "isNullable": false
}
],
"expectedRecord": "some_enum : HELLO ;",