This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch avro_1_9
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/avro_1_9 by this push:
     new cae40b8  Changes for upgrade Avro 1.9.2 and leverges hive with avro 
changes from https://linkedin.jfrog.io/artifactory/gobblin-hive (#3458)
cae40b8 is described below

commit cae40b87ce7846779380c0d000513ade31a4da0b
Author: Abhishek Nath <[email protected]>
AuthorDate: Tue Feb 15 09:19:08 2022 -0800

    Changes for upgrade Avro 1.9.2 and leverges hive with avro changes from 
https://linkedin.jfrog.io/artifactory/gobblin-hive (#3458)
---
 .../avro/MRCompactorAvroKeyDedupJobRunner.java     |   4 +-
 gobblin-core-base/build.gradle                     |   1 +
 .../converter/filter/AvroSchemaFieldRemover.java   |  13 +++++--
 ...GobblinTrackingEventFlattenFilterConverter.java |  10 +++--
 .../filter/AvroSchemaFieldRemoverTest.java         |  18 +++++++--
 ...linTrackingEventFlattenFilterConverterTest.java |  10 +++--
 .../resources/converter/recursive_schema_1.avsc    |   2 +-
 .../converter/recursive_schema_1_converted.avsc    |   2 +-
 .../resources/converter/recursive_schema_2.avsc    |   2 +-
 .../converter/recursive_schema_2_converted.avsc    |   2 +-
 .../recursive_schema_2_not_converted.avsc          |   2 +-
 gobblin-core/build.gradle                          |   1 +
 .../converter/avro/FlattenNestedKeyConverter.java  |   7 +++-
 .../avro/JsonElementConversionFactory.java         |   8 ++--
 .../converter/filter/AvroFieldsPickConverter.java  |   9 +++--
 .../filter/AvroFieldsPickConverterTest.java        |   2 +-
 .../AvroGenericRecordAccessorTest.java             |   5 +++
 .../src/test/resources/converter/complex3.json     |  12 +++---
 .../test/resources/converter/fieldPickInput.avsc   |   2 +-
 .../resources/converter/fieldPickInput_arrays.avro | Bin 552 -> 510 bytes
 gobblin-core/src/test/resources/serde/serde.avro   | Bin 277 -> 293 bytes
 gobblin-core/src/test/resources/serde/serde.avsc   |   4 +-
 .../AvroStringFieldEncryptorConverterTest.java     |   5 +++
 .../src/test/resources/fieldPickInput_arrays.avro  | Bin 552 -> 510 bytes
 gobblin-modules/gobblin-kafka-common/build.gradle  |   1 +
 .../converter/EnvelopePayloadConverter.java        |   8 +++-
 .../restli/throttling/PoliciesResourceTest.java    |   2 +-
 gobblin-utility/build.gradle                       |   2 +
 .../org/apache/gobblin/util/AvroFlattener.java     |   4 +-
 .../java/org/apache/gobblin/util/AvroUtils.java    |  42 +++++++++++++++------
 .../org/apache/gobblin/util/AvroFlattenerTest.java |   3 +-
 .../org/apache/gobblin/util/AvroUtilsTest.java     |  24 ++++++------
 .../optionWithinOptionWithinRecord_flattened.json  |   9 +++--
 .../recordWithinOptionWithinRecord_flattened.json  |   6 ++-
 gradle/scripts/defaultBuildProperties.gradle       |   4 +-
 gradle/scripts/dependencyDefinitions.gradle        |  14 ++++---
 gradle/scripts/repositories.gradle                 |   4 ++
 37 files changed, 161 insertions(+), 83 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
index 2a2ab6b..82c891d 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
@@ -37,6 +37,7 @@ import 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
 import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapreduce.AvroJob;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.gobblin.compaction.dataset.Dataset;
 import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
@@ -198,7 +199,8 @@ public class MRCompactorAvroKeyDedupJobRunner extends 
MRCompactorJobRunner {
     for (Field field : record.getFields()) {
       Optional<Schema> newFieldSchema = getKeySchema(field);
       if (newFieldSchema.isPresent()) {
-        fields.add(new Field(field.name(), newFieldSchema.get(), field.doc(), 
field.defaultValue()));
+        fields.add(AvroCompatibilityHelper.createSchemaField(field.name(), 
newFieldSchema.get(), field.doc(),
+            AvroUtils.getCompatibleDefaultValue(field)));
       }
     }
     if (!fields.isEmpty()) {
diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index fb7a4a7..83d1c9d 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -31,6 +31,7 @@ dependencies {
   compile externalDependency.avroMapredH2
   compile externalDependency.commonsCodec
   compile externalDependency.avro
+  compile externalDependency.avroCompatHelper
   compile externalDependency.guava
   compile externalDependency.slf4j
   compile externalDependency.typesafeConfig
diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
index d8d227b..be5c613 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
@@ -22,12 +22,15 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.gobblin.util.AvroUtils;
+
 import static org.apache.gobblin.util.AvroUtils.convertFieldToSchemaWithProps;
 
 
@@ -118,11 +121,13 @@ public class AvroSchemaFieldRemover {
       if (!this.shouldRemove(field)) {
         Field newField;
         if (this.children.containsKey(field.name())) {
-          newField = new Field(field.name(), 
this.children.get(field.name()).removeFields(field.schema(), schemaMap),
-              field.doc(), field.defaultValue());
+          newField = AvroCompatibilityHelper.createSchemaField(field.name(),
+              this.children.get(field.name()).removeFields(field.schema(), 
schemaMap),
+              field.doc(), AvroUtils.getCompatibleDefaultValue(field));
         } else {
-          newField = new Field(field.name(), 
DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), field.doc(),
-              field.defaultValue());
+          newField = AvroCompatibilityHelper.createSchemaField(field.name(),
+              DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), 
field.doc(),
+              AvroUtils.getCompatibleDefaultValue(field));
         }
         // Avro 1.9 compatible change - replaced deprecated public api 
getJsonProps with getObjectProps
         for (Map.Entry<String, Object> objectEntry : 
field.getObjectProps().entrySet()) {
diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
index bcad4f9..c44e2f5 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
@@ -26,6 +26,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -85,10 +86,11 @@ public class GobblinTrackingEventFlattenFilterConverter 
extends AvroToAvroConver
       String curFieldName = field.name();
       if (!field.schema().getType().equals(Schema.Type.MAP)) {
         if (fieldsRenameMap.containsKey(curFieldName)) {
-          newFields.add(
-              new Schema.Field(fieldsRenameMap.get(curFieldName), 
field.schema(), field.doc(), field.defaultValue()));
+          
newFields.add(AvroCompatibilityHelper.createSchemaField(fieldsRenameMap.get(curFieldName),
 field.schema(),
+              field.doc(), AvroUtils.getCompatibleDefaultValue(field)));
         } else {
-          newFields.add(new Schema.Field(curFieldName, field.schema(), 
field.doc(), field.defaultValue()));
+          
newFields.add(AvroCompatibilityHelper.createSchemaField(curFieldName, 
field.schema(), field.doc(),
+              AvroUtils.getCompatibleDefaultValue(field)));
         }
         this.nonMapFields.add(curFieldName);
       } else {
@@ -102,7 +104,7 @@ public class GobblinTrackingEventFlattenFilterConverter 
extends AvroToAvroConver
     for (String fieldToFlatten : ConfigUtils.getStringList(config, 
FIELDS_TO_FLATTEN)) {
       String newFieldName =
           this.fieldsRenameMap.containsKey(fieldToFlatten) ? 
this.fieldsRenameMap.get(fieldToFlatten) : fieldToFlatten;
-      newFields.add(new Field(newFieldName, Schema.create(Schema.Type.STRING), 
"", null));
+      newFields.add(AvroCompatibilityHelper.createSchemaField(newFieldName, 
Schema.create(Schema.Type.STRING), "", null));
     }
 
     return this;
diff --git 
a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
 
b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
index 669d040..76aadbe 100644
--- 
a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
+++ 
b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
@@ -35,18 +35,30 @@ public class AvroSchemaFieldRemoverTest {
 
   @Test
   public void testRemoveFields() throws IllegalArgumentException, IOException {
+    // Avro 1.9.2 uses Object as default value and uses internal 
JacksonUtils.toJson to convert the Object default value
+    // to JsonNode. Enum default value is not supported and hence the 
conversion fails. Updated the input schema and
+    // removed the non null default value for enum.
     Schema convertedSchema1 = 
convertSchema("/converter/recursive_schema_1.avsc", "YwchQiH.OjuzrLOtmqLW");
     Schema expectedSchema1 = 
parseSchema("/converter/recursive_schema_1_converted.avsc");
-    Assert.assertEquals(convertedSchema1, expectedSchema1);
+    Assert.assertEquals(convertedSchema1.toString(), 
expectedSchema1.toString());
 
+    // Avro 1.9.2 uses Object as default value and uses internal 
JacksonUtils.toJson to convert the Object default value
+    // to JsonNode. Default value of record type  is not supported and hence 
the conversion fails. Updated the input
+    // schema and removed the non null default value for enum. In earlier Avro 
1.8.1 there was no such conversion.
+    // For example, remove the below default value.
+    // "default": {
+    //     "AUjmPup": null,
+    //     "IFBRClOa": "zG",
+    //     "sZIVnwv": null
+    // }
     Schema convertedSchema2 =
         convertSchema("/converter/recursive_schema_2.avsc", 
"FBuKC.wIINqII.lvaerUEKxBQUWg,eFQjDj.TzuYZajb");
     Schema expectedSchema2 = 
parseSchema("/converter/recursive_schema_2_converted.avsc");
-    Assert.assertEquals(convertedSchema2, expectedSchema2);
+    Assert.assertEquals(convertedSchema2.toString(), 
expectedSchema2.toString());
 
     Schema convertedSchema3 = 
convertSchema("/converter/recursive_schema_2.avsc", 
"field.that.does.not.exist");
     Schema expectedSchema3 = 
parseSchema("/converter/recursive_schema_2_not_converted.avsc");
-    Assert.assertEquals(convertedSchema3, expectedSchema3);
+    Assert.assertEquals(convertedSchema3.toString(), 
expectedSchema3.toString());
   }
 
   private Schema parseSchema(String schemaFile) throws IOException {
diff --git 
a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
 
b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
index 0989988..4c1577d 100644
--- 
a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
+++ 
b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
@@ -45,12 +45,13 @@ public class GobblinTrackingEventFlattenFilterConverterTest 
{
     Schema output = converter.convertSchema(
         new 
Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")),
         workUnitState);
-    Assert.assertEquals(output, new Schema.Parser().parse(
+    Schema parsedSchema = new Schema.Parser().parse(
         
"{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":"
             + "[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at 
which event was created.\",\"default\":0},"
             + 
"{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace 
used for filtering of events.\"},"
             + "{\"name\":\"name\",\"type\":\"string\",\"doc\":\"Event 
name.\"},{\"name\":\"field1\",\"type\":\"string\",\"doc\":\"\"},"
-            + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}"));
+            + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}");
+    Assert.assertEquals(output.toString(), parsedSchema.toString());
 
     props.put(GobblinTrackingEventFlattenFilterConverter.class.getSimpleName() 
+ "."
         + GobblinTrackingEventFlattenFilterConverter.FIELDS_RENAME_MAP, 
"name:eventName,field1:field3");
@@ -61,11 +62,12 @@ public class GobblinTrackingEventFlattenFilterConverterTest 
{
     Schema output2 = converter.convertSchema(
         new 
Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")),
         workUnitState2);
-    Assert.assertEquals(output2, new Schema.Parser().parse(
+    parsedSchema = new Schema.Parser().parse(
         
"{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":"
             + "[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at 
which event was created.\",\"default\":0},"
             + 
"{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace 
used for filtering of events.\"},"
             + "{\"name\":\"eventName\",\"type\":\"string\",\"doc\":\"Event 
name.\"},{\"name\":\"field3\",\"type\":\"string\",\"doc\":\"\"},"
-            + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}"));
+            + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}");
+    Assert.assertEquals(output2.toString(), parsedSchema.toString());
   }
 }
diff --git 
a/gobblin-core-base/src/test/resources/converter/recursive_schema_1.avsc 
b/gobblin-core-base/src/test/resources/converter/recursive_schema_1.avsc
index 4b41e2d..3b0070a 100644
--- a/gobblin-core-base/src/test/resources/converter/recursive_schema_1.avsc
+++ b/gobblin-core-base/src/test/resources/converter/recursive_schema_1.avsc
@@ -1 +1 @@
-{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj
 NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw 
tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll 
lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj
 N [...]
+{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj
 NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw 
tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll 
lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj
 N [...]
diff --git 
a/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
 
b/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
index 66a8fae..0b4b245 100644
--- 
a/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
+++ 
b/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
@@ -1 +1 @@
-{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj
 NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw 
tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll 
lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj
 N [...]
+{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj
 NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw 
tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll 
lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj
 N [...]
diff --git 
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2.avsc 
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2.avsc
index 5a15eae..c708ecb 100644
--- a/gobblin-core-base/src/test/resources/converter/recursive_schema_2.avsc
+++ b/gobblin-core-base/src/test/resources/converter/recursive_schema_2.avsc
@@ -1 +1 @@
-{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"TzuYZajb","type":"int","doc":"BLD
 isOtza xj fS jty qGGt PpaPkENfDJ jty 
HTvfxa"},{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu fS jty qGGt 
PpaPkENfDJ jty HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD 
kTrp fS jty FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty 
kdYkvr"},{"name":"jf [...]
+{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"TzuYZajb","type":"int","doc":"BLD
 isOtza xj fS jty qGGt PpaPkENfDJ jty 
HTvfxa"},{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu fS jty qGGt 
PpaPkENfDJ jty HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD 
kTrp fS jty FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty 
kdYkvr"},{"name":"jf [...]
diff --git 
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2_converted.avsc
 
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2_converted.avsc
index a733019..8cff0df 100644
--- 
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2_converted.avsc
+++ 
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2_converted.avsc
@@ -1 +1 @@
-{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu
 fS jty qGGt PpaPkENfDJ jty 
HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD kTrp fS jty 
FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty 
kdYkvr"},{"name":"jfqWhtm","type":"string","doc":"BLD name fS jty 
jfqWhtm"},{"name":"ORVhoRguhhZ","type":["nu [...]
\ No newline at end of file
+{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu
 fS jty qGGt PpaPkENfDJ jty 
HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD kTrp fS jty 
FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty 
kdYkvr"},{"name":"jfqWhtm","type":"string","doc":"BLD name fS jty 
jfqWhtm"},{"name":"ORVhoRguhhZ","type":["nu [...]
\ No newline at end of file
diff --git 
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2_not_converted.avsc
 
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2_not_converted.avsc
index c3130f9..81519b3 100644
--- 
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2_not_converted.avsc
+++ 
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2_not_converted.avsc
@@ -1 +1 @@
-{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"TzuYZajb","type":"int","doc":"BLD
 isOtza xj fS jty qGGt PpaPkENfDJ jty 
HTvfxa"},{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu fS jty qGGt 
PpaPkENfDJ jty HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD 
kTrp fS jty FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty 
kdYkvr"},{"name":"jf [...]
\ No newline at end of file
+{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"TzuYZajb","type":"int","doc":"BLD
 isOtza xj fS jty qGGt PpaPkENfDJ jty 
HTvfxa"},{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu fS jty qGGt 
PpaPkENfDJ jty HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD 
kTrp fS jty FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty 
kdYkvr"},{"name":"jf [...]
\ No newline at end of file
diff --git a/gobblin-core/build.gradle b/gobblin-core/build.gradle
index 9a0bd26..9ca2467 100644
--- a/gobblin-core/build.gradle
+++ b/gobblin-core/build.gradle
@@ -36,6 +36,7 @@ dependencies {
   compile externalDependency.commonsMath
   compile externalDependency.commonsHttpClient
   compile externalDependency.avro
+  compile externalDependency.avroCompatHelper
   compile externalDependency.guava
   compile externalDependency.gson
   compile externalDependency.slf4j
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
index ccf0819..ec5309b 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
@@ -25,6 +25,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 import com.google.common.base.CaseFormat;
 import com.google.common.base.Optional;
@@ -75,7 +76,8 @@ public class FlattenNestedKeyConverter extends 
Converter<Schema, Schema, Generic
     List<Field> fields = new ArrayList<>();
     // Clone the existing fields
     for (Field field : inputSchema.getFields()) {
-      fields.add(new Field(field.name(), field.schema(), field.doc(), 
field.defaultValue(), field.order()));
+      fields.add(AvroCompatibilityHelper.createSchemaField(field.name(), 
field.schema(), field.doc(),
+          AvroUtils.getCompatibleDefaultValue(field), field.order()));
     }
 
     // Convert each of nested keys into a top level field
@@ -102,7 +104,8 @@ public class FlattenNestedKeyConverter extends 
Converter<Schema, Schema, Generic
       Field field = optional.get();
 
       // Make a copy under a new name
-      Field copy = new Field(name, field.schema(), field.doc(), 
field.defaultValue(), field.order());
+      Field copy = AvroCompatibilityHelper.createSchemaField(name, 
field.schema(), field.doc(),
+          AvroUtils.getCompatibleDefaultValue(field), field.order());
       fields.add(copy);
     }
 
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
index 0d6707d..fbd05a6 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
@@ -35,7 +35,6 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.EmptyIterable;
 import org.apache.gobblin.converter.json.JsonSchema;
-import org.codehaus.jackson.node.JsonNodeFactory;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 
 import lombok.extern.java.Log;
 import sun.util.calendar.ZoneInfo;
@@ -618,8 +618,10 @@ public class JsonElementConversionFactory {
           throw new UnsupportedOperationException(e);
         }
 
-        Schema.Field fld = new Schema.Field(map.getColumnName(), fldSchema, 
map.getComment(),
-            map.isNullable() ? JsonNodeFactory.instance.nullNode() : null);
+        // [Avro 1.9.2 upgrade] No need to pass 
JsonNodeFactory.instance.nullNode() if map is nullable.
+        // AvroCompatibilityHelper will take care of this.
+        Schema.Field fld = 
AvroCompatibilityHelper.createSchemaField(map.getColumnName(), fldSchema, 
map.getComment(),
+            null);
         fld.addProp(SOURCE_TYPE, sourceType);
         fields.add(fld);
       }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
index 6f341fe..e1499ab 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
@@ -26,6 +26,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -144,14 +145,14 @@ public class AvroFieldsPickConverter extends 
AvroToAvroConverterBase {
       Preconditions.checkNotNull(innerSrcField, child.val + " does not exist 
under " + recordSchema);
 
       if (child.children.isEmpty()) { //Leaf
-        newFields.add(
-            new Field(innerSrcField.name(), innerSrcField.schema(), 
innerSrcField.doc(), innerSrcField.defaultValue()));
+        
newFields.add(AvroCompatibilityHelper.createSchemaField(innerSrcField.name(), 
innerSrcField.schema(),
+            innerSrcField.doc(), 
AvroUtils.getCompatibleDefaultValue(innerSrcField)));
       } else {
         Schema innerSrcSchema = innerSrcField.schema();
 
         Schema innerDestSchema = createSchemaHelper(innerSrcSchema, child); 
//Recurse of schema
-        Field innerDestField =
-            new Field(innerSrcField.name(), innerDestSchema, 
innerSrcField.doc(), innerSrcField.defaultValue());
+        Field innerDestField = 
AvroCompatibilityHelper.createSchemaField(innerSrcField.name(), innerDestSchema,
+            innerSrcField.doc(), 
AvroUtils.getCompatibleDefaultValue(innerSrcField));
         newFields.add(innerDestField);
       }
     }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
index d1244e8..1bff418 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
@@ -88,7 +88,7 @@ public class AvroFieldsPickConverterTest {
         while (expectedDataFileReader.hasNext()) {
           GenericRecord expected = expectedDataFileReader.next();
           GenericRecord actual = converter.convertRecord(convertedSchema, 
srcDataFileReader.next(), workUnitState).iterator().next();
-          Assert.assertEquals(actual, expected);
+          Assert.assertEquals(actual.toString(), expected.toString());
         }
         Assert.assertTrue(!srcDataFileReader.hasNext());
       }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
index 17e46b0..8987561 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
@@ -139,6 +139,11 @@ public class AvroGenericRecordAccessorTest {
 
   @Test
   public void testGetMultiConvertsStrings() throws IOException {
+    // The below error is due to invalid avro data. Type with "null" union 
must have "null" first and then
+    // actual type. This is corrected in fieldPickInput.avsc file and 
fieldPickInput_arrays.avro
+    // Error: org.apache.avro.AvroTypeException: Invalid default for field 
favorite_quotes: null
+    // not a [{"type":"array","items":"string"},"null"]
+    // Correct data: "type": ["null", { "type": "array", "items": "string"}]
     updateRecordFromTestResource("converter/fieldPickInput", 
"converter/fieldPickInput_arrays.avro");
     Map<String, Object> ret = accessor.getMultiGeneric("favorite_quotes");
     Object val = ret.get("favorite_quotes");
diff --git a/gobblin-core/src/test/resources/converter/complex3.json 
b/gobblin-core/src/test/resources/converter/complex3.json
index 6f57998..9872d51 100644
--- a/gobblin-core/src/test/resources/converter/complex3.json
+++ b/gobblin-core/src/test/resources/converter/complex3.json
@@ -13,8 +13,8 @@
             "isNullable": true,
             "dataType": {
               "type": [
-                "int",
-                "null"
+                "null",
+                "int"
               ]
             }
           },
@@ -263,12 +263,12 @@
               "name": "id",
               "type": [
                 {
-                  "type": "int",
-                  "source.type": "int"
-                },
-                {
                   "type": "null",
                   "source.type": "null"
+                },
+                {
+                  "type": "int",
+                  "source.type": "int"
                 }
               ],
               "doc": "System-assigned numeric user ID. Cannot be changed by 
the user.",
diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc 
b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
index 4da3fdf..17e7fda 100644
--- a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
+++ b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
@@ -8,6 +8,6 @@
      {"name": "date_of_birth", "type": "long"},
      {"name": "last_modified", "type": "long"},
      {"name": "created", "type": "long"},
-     {"name": "favorite_quotes", "type": [{ "type": "array", "items": 
"string"}, "null"], "default": null}
+     {"name": "favorite_quotes", "type": ["null", { "type": "array", "items": 
"string"}], "default": null}
  ]
 }
diff --git 
a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro 
b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro
index c10a607..13130a8 100644
Binary files 
a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro and 
b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro differ
diff --git a/gobblin-core/src/test/resources/serde/serde.avro 
b/gobblin-core/src/test/resources/serde/serde.avro
index fe23a8d..977e380 100644
Binary files a/gobblin-core/src/test/resources/serde/serde.avro and 
b/gobblin-core/src/test/resources/serde/serde.avro differ
diff --git a/gobblin-core/src/test/resources/serde/serde.avsc 
b/gobblin-core/src/test/resources/serde/serde.avsc
index 470827a..c7f3b55 100644
--- a/gobblin-core/src/test/resources/serde/serde.avsc
+++ b/gobblin-core/src/test/resources/serde/serde.avsc
@@ -3,7 +3,7 @@
  "name": "User",
  "fields": [
      {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]}
+     {"name": "favorite_number",  "type": ["null", "int"]},
+     {"name": "favorite_color", "type": ["null", "string"]}
  ]
 }
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
 
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
index d16766c..6bfc2ba 100644
--- 
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
+++ 
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
@@ -97,6 +97,11 @@ public class AvroStringFieldEncryptorConverterTest {
     wuState.getJobState().setProp("converter.encrypt.algorithm", 
"insecure_shift");
 
     converter.init(wuState);
+    // The below error is due to invalid avro data. Type with "null" union 
must have "null" first and then
+    // actual type. This is corrected in fieldPickInput.avsc and 
fieldPickInput_arrays.avro
+    // Error: org.apache.avro.AvroTypeException: Invalid default for field 
favorite_quotes: null
+    // not a [{"type":"array","items":"string"},"null"]
+    // Correct data: "type": ["null", { "type": "array", "items": "string"}]
     GenericRecord inputRecord =
         
getRecordFromFile(getClass().getClassLoader().getResource("fieldPickInput_arrays.avro").getPath());
     GenericArray origValues = (GenericArray) 
inputRecord.get("favorite_quotes");
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
 
b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
index c10a607..13130a8 100644
Binary files 
a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
 and 
b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
 differ
diff --git a/gobblin-modules/gobblin-kafka-common/build.gradle 
b/gobblin-modules/gobblin-kafka-common/build.gradle
index 00954b9..d9a83e6 100644
--- a/gobblin-modules/gobblin-kafka-common/build.gradle
+++ b/gobblin-modules/gobblin-kafka-common/build.gradle
@@ -27,6 +27,7 @@ dependencies {
   compile project(":gobblin-core-base")
 
   compile externalDependency.avro
+  compile externalDependency.avroCompatHelper
   compile externalDependency.confluentSchemaRegistryClient
   compile externalDependency.commonsCodec
   compile externalDependency.commonsHttpClient
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
index 6408a4c..198490a 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
@@ -24,7 +24,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.util.AvroUtils;
 
 
 /**
@@ -70,7 +72,8 @@ public class EnvelopePayloadConverter extends 
BaseEnvelopeSchemaConverter<Generi
       return createLatestPayloadField(field);
     }
     // Make a copy of the field to the output schema
-    return new Field(field.name(), field.schema(), field.doc(), 
field.defaultValue(), field.order());
+    return AvroCompatibilityHelper.createSchemaField(field.name(), 
field.schema(), field.doc(),
+        AvroUtils.getCompatibleDefaultValue(field), field.order());
   }
 
   /**
@@ -83,7 +86,8 @@ public class EnvelopePayloadConverter extends 
BaseEnvelopeSchemaConverter<Generi
       throws SchemaConversionException {
     try {
       Schema payloadSchema = fetchLatestPayloadSchema();
-      return new Field(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC, 
field.defaultValue(), field.order());
+      return AvroCompatibilityHelper.createSchemaField(field.name(), 
payloadSchema, DECORATED_PAYLOAD_DOC,
+          AvroUtils.getCompatibleDefaultValue(field), field.order());
     } catch (Exception e) {
       throw new SchemaConversionException(e);
     }
diff --git 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
index c22062c..71629af 100644
--- 
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
+++ 
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
@@ -38,7 +38,7 @@ public class PoliciesResourceTest {
     ThrottlingPolicyFactory factory = new ThrottlingPolicyFactory();
     SharedLimiterKey res1key = new SharedLimiterKey("res1");
 
-    Map<String, String> configMap = 
avro.shaded.com.google.common.collect.ImmutableMap.<String, String>builder()
+    Map<String, String> configMap = 
com.google.common.collect.ImmutableMap.<String, String>builder()
         .put(BrokerConfigurationKeyGenerator.generateKey(factory, res1key, 
null, ThrottlingPolicyFactory.POLICY_KEY),
             CountBasedPolicy.FACTORY_ALIAS)
         .put(BrokerConfigurationKeyGenerator.generateKey(factory, res1key, 
null, CountBasedPolicy.COUNT_KEY), "100")
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index 78f87c5..17fb469 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -30,6 +30,8 @@ dependencies {
   compile externalDependency.guava
   compile externalDependency.slf4j
   compile externalDependency.avro
+  compile externalDependency.avroCompiler
+  compile externalDependency.avroCompatHelper
   compile externalDependency.hiveMetastore
   compile externalDependency.jodaTime
   compile externalDependency.jacksonCore
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
index 7a15357..8d1a295 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
@@ -411,7 +412,8 @@ public class AvroFlattener {
             }
           }
         }
-        Schema.Field field = new Schema.Field(flattenName, 
flattenedFieldSchema, f.doc(), f.defaultValue(), f.order());
+        Schema.Field field = 
AvroCompatibilityHelper.createSchemaField(flattenName, flattenedFieldSchema, 
f.doc(),
+            AvroUtils.getCompatibleDefaultValue(f), f.order());
 
         if (StringUtils.isNotBlank(flattenSource)) {
           field.addProp(FLATTENED_SOURCE_KEY, flattenSource);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index d976ea7..e657b37 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -51,6 +51,7 @@ import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
@@ -77,6 +78,7 @@ import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
@@ -134,8 +136,9 @@ public class AvroUtils {
   public static List<Field> deepCopySchemaFields(Schema readerSchema) {
     return readerSchema.getFields().stream()
         .map(field -> {
-          Field f = new Field(field.name(), field.schema(), field.doc(), 
field.defaultValue(), field.order());
-          field.getProps().forEach((key, value) -> f.addProp(key, value));
+          Field f = AvroCompatibilityHelper.createSchemaField(field.name(), 
field.schema(), field.doc(),
+              getCompatibleDefaultValue(field), field.order());
+          field.getObjectProps().forEach((key, value) -> f.addProp(key, 
value));
           return f;
         })
         .collect(Collectors.toList());
@@ -604,7 +607,8 @@ public class AvroUtils {
 
     List<Field> combinedFields = Lists.newArrayList();
     for (Field newFld : newSchema.getFields()) {
-      combinedFields.add(new Field(newFld.name(), newFld.schema(), 
newFld.doc(), newFld.defaultValue()));
+      
combinedFields.add(AvroCompatibilityHelper.createSchemaField(newFld.name(), 
newFld.schema(), newFld.doc(),
+          getCompatibleDefaultValue(newFld)));
     }
 
     for (Field oldFld : oldSchema.getFields()) {
@@ -619,12 +623,15 @@ public class AvroUtils {
             }
           }
           Schema newFldSchema = Schema.createUnion(union);
-          combinedFields.add(new Field(oldFld.name(), newFldSchema, 
oldFld.doc(), oldFld.defaultValue()));
+          
combinedFields.add(AvroCompatibilityHelper.createSchemaField(oldFld.name(), 
newFldSchema, oldFld.doc(),
+              getCompatibleDefaultValue(oldFld)));
         } else {
           union.add(Schema.create(Type.NULL));
           union.add(oldFldSchema);
           Schema newFldSchema = Schema.createUnion(union);
-          combinedFields.add(new Field(oldFld.name(), newFldSchema, 
oldFld.doc(), oldFld.defaultValue()));
+          Object obj = getCompatibleDefaultValue(oldFld);
+          
combinedFields.add(AvroCompatibilityHelper.createSchemaField(oldFld.name(), 
newFldSchema, oldFld.doc(),
+              getCompatibleDefaultValue(oldFld)));
         }
       }
     }
@@ -673,7 +680,8 @@ public class AvroUtils {
     for (Field field : record.getFields()) {
       Optional<Schema> newFieldSchema = 
removeUncomparableFields(field.schema(), processed);
       if (newFieldSchema.isPresent()) {
-        fields.add(new Field(field.name(), newFieldSchema.get(), field.doc(), 
field.defaultValue()));
+        fields.add(AvroCompatibilityHelper.createSchemaField(field.name(), 
newFieldSchema.get(), field.doc(),
+            getCompatibleDefaultValue(field)));
       }
     }
 
@@ -733,7 +741,8 @@ public class AvroUtils {
         if (null == input) {
           return null;
         }
-        Field field = new Field(input.name(), input.schema(), input.doc(), 
input.defaultValue(), input.order());
+        Field field = AvroCompatibilityHelper.createSchemaField(input.name(), 
input.schema(), input.doc(),
+            getCompatibleDefaultValue(input), input.order());
         return field;
       }
     });
@@ -776,8 +785,8 @@ public class AvroUtils {
         List<Schema.Field> newFields = new ArrayList<>();
         if (schema.getFields().size() > 0) {
           for (Schema.Field oldField : schema.getFields()) {
-            Field newField = new Field(oldField.name(), 
switchNamespace(oldField.schema(), namespaceOverride), oldField.doc(),
-                oldField.defaultValue(), oldField.order());
+            Field newField = 
AvroCompatibilityHelper.createSchemaField(oldField.name(), 
switchNamespace(oldField.schema(),
+                namespaceOverride), oldField.doc(), 
getCompatibleDefaultValue(oldField), oldField.order());
             // Copy field level properties
             copyFieldProperties(oldField, newField);
             newFields.add(newField);
@@ -1088,8 +1097,8 @@ public class AvroUtils {
           Schema copiedFieldSchema = dropRecursive(fieldSchemaEntry, 
newParents, fieldsWithRecursion);
           if (copiedFieldSchema == null) {
           } else {
-            Schema.Field copiedField =
-                new Schema.Field(field.name(), copiedFieldSchema, field.doc(), 
field.defaultValue(), field.order());
+            Schema.Field copiedField = 
AvroCompatibilityHelper.createSchemaField(field.name(), copiedFieldSchema,
+                field.doc(), getCompatibleDefaultValue(field), field.order());
             copyFieldProperties(field, copiedField);
             copiedSchemaFields.add(copiedField);
           }
@@ -1142,7 +1151,16 @@ public class AvroUtils {
    * @param copiedField
    */
   private static void copyFieldProperties(Schema.Field sourceField, 
Schema.Field copiedField) {
-    sourceField.getProps().forEach((key, value) -> copiedField.addProp(key, 
value));
+    sourceField.getObjectProps().forEach((key, value) -> 
copiedField.addProp(key, value));
   }
 
+  @Nullable
+  public static Object getCompatibleDefaultValue(Schema.Field field) {
+    return AvroCompatibilityHelper.fieldHasDefault(field)
+        ? AvroCompatibilityHelper.getGenericDefaultValue(field)
+        : null;
+  }
+
+
+
 }
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
index 521bda2..c6889da 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
@@ -114,8 +114,7 @@ public class AvroFlattenerTest {
 
     Schema originalSchema = 
readSchemaFromJsonFile("optionWithinOptionWithinRecord_original.json");
     Schema expectedSchema = 
readSchemaFromJsonFile("optionWithinOptionWithinRecord_flattened.json");
-
-    Assert.assertEquals(new AvroFlattener().flatten(originalSchema, false), 
expectedSchema);
+    Assert.assertEquals(new AvroFlattener().flatten(originalSchema, 
false).toString(), expectedSchema.toString());
   }
 
   /**
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 6f18a35..931ac34 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -49,9 +49,9 @@ import org.apache.avro.util.internal.JacksonUtils;
 import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ArrayNode;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -101,7 +101,7 @@ public class AvroUtilsTest {
 
     Schema expectedOutputSchema1 =
         new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", " 
+ "\"fields\":["
-            + "{\"name\": \"name\", \"type\": \"string\"}, " + "{\"name\": 
\"number\", \"type\": [\"null\", \"int\"]}"
+            + "{\"name\": \"name\", \"type\": \"string\"}, " + "{\"name\": 
\"number\", \"type\": [\"null\", \"int\"],\"default\":null}]}"
             + "]}");
 
     Assert.assertEquals(expectedOutputSchema1, 
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1));
@@ -118,7 +118,7 @@ public class AvroUtilsTest {
     Schema expectedOutputSchema2 =
         new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", " 
+ "\"fields\":["
             + "{\"name\": \"name\", \"type\": \"string\"}, "
-            + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": 
\"array\", \"items\": \"string\"}]}" + "]}");
+            + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": 
\"array\", \"items\": \"string\"}],\"default\":null}]}" + "]}");
 
     Assert.assertEquals(expectedOutputSchema2, 
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2));
   }
@@ -146,10 +146,10 @@ public class AvroUtilsTest {
             .parse("{\"type\":\"record\", \"name\":\"test\", "
                 + "\"fields\":["
                 + "{\"name\": \"name\", \"type\": \"string\"}, "
-                + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": 
\"string\"}, {\"type\": \"array\", \"items\": \"string\"}]}"
+                + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": 
\"string\"}, {\"type\": \"array\", \"items\": \"string\"}], \"default\": 
null}]}"
                 + "]}");
 
-    Assert.assertEquals(expectedOutputSchema1, 
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1));
+    Assert.assertEquals(expectedOutputSchema1.toString(), 
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1).toString());
 
     Schema oldSchema2 =
         new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", " 
+ "\"fields\":["
@@ -163,9 +163,9 @@ public class AvroUtilsTest {
     Schema expectedOutputSchema2 =
         new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", " 
+ "\"fields\":["
             + "{\"name\": \"name\", \"type\": \"string\"}, "
-            + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": 
\"array\", \"items\": \"string\"}]}" + "]}");
+            + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": 
\"array\", \"items\": \"string\"}], \"default\": null}" + "]}");
 
-    Assert.assertEquals(expectedOutputSchema2, 
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2));
+    Assert.assertEquals(expectedOutputSchema2.toString(), 
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2).toString());
   }
 
   /**
@@ -190,10 +190,10 @@ public class AvroUtilsTest {
             .parse("{\"type\":\"record\", \"name\":\"test\", "
                 + "\"fields\":["
                 + "{\"name\": \"name\", \"type\": \"string\"}, "
-                + "{\"name\": \"color\", \"type\": [\"null\", \"string\"]}, "
-                + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": 
\"string\"}, {\"type\": \"array\", \"items\": \"string\"}]}"
+                + "{\"name\": \"color\", \"type\": [\"null\", \"string\"], 
\"default\": null}, "
+                + "{\"name\": \"number\", \"type\": [\"null\", {\"type\": 
\"string\"}, {\"type\": \"array\", \"items\": \"string\"}], \"default\": 
null}]}"
                 + "]}");
-    Assert.assertEquals(expectedOutputSchema, 
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema, newSchema));
+    Assert.assertEquals(expectedOutputSchema.toString(), 
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema, newSchema).toString());
   }
 
   /**
diff --git 
a/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
 
b/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
index dc4e37d..c2dd3a6 100644
--- 
a/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
+++ 
b/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
@@ -4,15 +4,18 @@
   "fields" : [ {
     "name" : 
"parentFieldUnion__unionRecordMemberFieldUnion__superNestedFieldString1",
     "type" : [ "null", "string" ],
-    "flatten_source" : 
"parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString1"
+    "flatten_source" : 
"parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString1",
+    "default": null
   }, {
     "name" : 
"parentFieldUnion__unionRecordMemberFieldUnion__superNestedFieldString2",
     "type" : [ "null", "string" ],
-    "flatten_source" : 
"parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString2"
+    "flatten_source" : 
"parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString2",
+    "default": null
   }, {
     "name" : "parentFieldUnion__unionRecordMemberFieldString",
     "type" : [ "null", "string" ],
-    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString"
+    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString",
+    "default": null
   }, {
     "name" : "parentFieldInt",
     "type" : "int"
diff --git 
a/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
 
b/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
index f6ca3cf..9b6c8b9 100644
--- 
a/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
+++ 
b/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
@@ -4,11 +4,13 @@
   "fields" : [ {
     "name" : "parentFieldUnion__unionRecordMemberFieldLong",
     "type" : [ "null", "long" ],
-    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldLong"
+    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldLong",
+    "default": null
   }, {
     "name" : "parentFieldUnion__unionRecordMemberFieldString",
     "type" : [ "null", "string" ],
-    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString"
+    "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString",
+    "default": null
   }, {
     "name" : "parentFieldInt",
     "type" : "int"
diff --git a/gradle/scripts/defaultBuildProperties.gradle 
b/gradle/scripts/defaultBuildProperties.gradle
index fec6ef9..3d8ce31 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -23,14 +23,14 @@ def BuildProperties BUILD_PROPERTIES = new 
BuildProperties(project)
     .register(new BuildProperty("sonatypeArtifactSnapshotRepository", 
"https://oss.sonatype.org/content/repositories/snapshots/";, "Maven repository 
to publish artifacts"))
     .register(new BuildProperty("nexusArtifactRepository", 
"https://repository.apache.org/service/local/staging/deploy/maven2";, "Maven 
repository to publish artifacts"))
     .register(new BuildProperty("nexusArtifactSnapshotRepository", 
"https://repository.apache.org/content/repositories/snapshots";, "Maven 
repository to publish artifacts"))
-    .register(new BuildProperty("avroVersion", "1.8.1", "Avro dependencies 
version"))
+    .register(new BuildProperty("avroVersion", "1.9.2", "Avro dependencies 
version"))
     .register(new BuildProperty("awsVersion", "1.11.8", "AWS dependencies 
version"))
     .register(new BuildProperty("bytemanVersion", "4.0.5", "Byteman 
dependencies version"))
     .register(new BuildProperty("confluentVersion", "2.0.1", "confluent 
dependencies version"))
     .register(new BuildProperty("doNotSignArtifacts", false, "Do not sight 
Maven artifacts"))
     .register(new BuildProperty("gobblinFlavor", "standard", "Build flavor 
(see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
     .register(new BuildProperty("hadoopVersion", "2.3.0", "Hadoop dependencies 
version"))
-    .register(new BuildProperty("hiveVersion", "1.0.1", "Hive dependencies 
version"))
+    .register(new BuildProperty("hiveVersion", "1.0.1-avro", "Hive 
dependencies version"))
     .register(new BuildProperty("icebergVersion", "0.10.0", "Iceberg 
dependencies version"))
     .register(new BuildProperty("jdkVersion", 
JavaVersion.VERSION_1_8.toString(),
     "Java languange compatibility; supported versions: " + 
JavaVersion.VERSION_1_8))
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 670a0d0..b8ac319 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -25,6 +25,8 @@ ext.externalDependency = [
     "antlrRuntime": "org.antlr:antlr-runtime:3.5.2",
     "avro": "org.apache.avro:avro:" + avroVersion,
     "avroMapredH2": "org.apache.avro:avro-mapred:" + avroVersion,
+    "avroCompatHelper": "com.linkedin.avroutil1:helper-all:0.2.71",
+    "avroCompiler": "org.apache.avro:avro-compiler:" + avroVersion,
     "awsCore": "com.amazonaws:aws-java-sdk-core:" + awsVersion,
     "awsAsg": "com.amazonaws:aws-java-sdk-autoscaling:" + awsVersion,
     "awsAppAsg": "com.amazonaws:aws-java-sdk-applicationautoscaling:" + 
awsVersion,
@@ -66,12 +68,12 @@ ext.externalDependency = [
     "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
     "hdrHistogram": "org.hdrhistogram:HdrHistogram:2.1.11",
     "helix": "org.apache.helix:helix-core:0.9.4",
-    "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
-    "hiveService": "org.apache.hive:hive-service:" + hiveVersion,
-    "hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,
-    "hiveMetastore": "org.apache.hive:hive-metastore:" + hiveVersion,
-    "hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core",
-    "hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
+    "hiveCommon": "com.linkedin.hive:hive-common:" + hiveVersion,
+    "hiveService": "com.linkedin.hive:hive-service:" + hiveVersion,
+    "hiveJdbc": "com.linkedin.hive:hive-jdbc:" + hiveVersion,
+    "hiveMetastore": "com.linkedin.hive:hive-metastore:" + hiveVersion,
+    "hiveExec": "com.linkedin.hive:hive-exec:" + hiveVersion + ":core",
+    "hiveSerDe": "com.linkedin.hive:hive-serde:" + hiveVersion,
     "hiveStorageApi": "org.apache.hive:hive-storage-api:2.4.0",
     "httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
     "httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
diff --git a/gradle/scripts/repositories.gradle 
b/gradle/scripts/repositories.gradle
index 363c0b1..096df1b 100644
--- a/gradle/scripts/repositories.gradle
+++ b/gradle/scripts/repositories.gradle
@@ -29,7 +29,11 @@ repositories {
   maven {
     url "https://linkedin.jfrog.io/artifactory/open-source/";
   }
+  maven {
+    url "https://linkedin.jfrog.io/artifactory/gobblin-hive/";
+  }
   jcenter()
+  mavenLocal()
 }
 
 try {

Reply via email to