This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch avro_1_9
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/avro_1_9 by this push:
new cae40b8 Changes for upgrade Avro 1.9.2 and leverges hive with avro
changes from https://linkedin.jfrog.io/artifactory/gobblin-hive (#3458)
cae40b8 is described below
commit cae40b87ce7846779380c0d000513ade31a4da0b
Author: Abhishek Nath <[email protected]>
AuthorDate: Tue Feb 15 09:19:08 2022 -0800
Changes for upgrade Avro 1.9.2 and leverges hive with avro changes from
https://linkedin.jfrog.io/artifactory/gobblin-hive (#3458)
---
.../avro/MRCompactorAvroKeyDedupJobRunner.java | 4 +-
gobblin-core-base/build.gradle | 1 +
.../converter/filter/AvroSchemaFieldRemover.java | 13 +++++--
...GobblinTrackingEventFlattenFilterConverter.java | 10 +++--
.../filter/AvroSchemaFieldRemoverTest.java | 18 +++++++--
...linTrackingEventFlattenFilterConverterTest.java | 10 +++--
.../resources/converter/recursive_schema_1.avsc | 2 +-
.../converter/recursive_schema_1_converted.avsc | 2 +-
.../resources/converter/recursive_schema_2.avsc | 2 +-
.../converter/recursive_schema_2_converted.avsc | 2 +-
.../recursive_schema_2_not_converted.avsc | 2 +-
gobblin-core/build.gradle | 1 +
.../converter/avro/FlattenNestedKeyConverter.java | 7 +++-
.../avro/JsonElementConversionFactory.java | 8 ++--
.../converter/filter/AvroFieldsPickConverter.java | 9 +++--
.../filter/AvroFieldsPickConverterTest.java | 2 +-
.../AvroGenericRecordAccessorTest.java | 5 +++
.../src/test/resources/converter/complex3.json | 12 +++---
.../test/resources/converter/fieldPickInput.avsc | 2 +-
.../resources/converter/fieldPickInput_arrays.avro | Bin 552 -> 510 bytes
gobblin-core/src/test/resources/serde/serde.avro | Bin 277 -> 293 bytes
gobblin-core/src/test/resources/serde/serde.avsc | 4 +-
.../AvroStringFieldEncryptorConverterTest.java | 5 +++
.../src/test/resources/fieldPickInput_arrays.avro | Bin 552 -> 510 bytes
gobblin-modules/gobblin-kafka-common/build.gradle | 1 +
.../converter/EnvelopePayloadConverter.java | 8 +++-
.../restli/throttling/PoliciesResourceTest.java | 2 +-
gobblin-utility/build.gradle | 2 +
.../org/apache/gobblin/util/AvroFlattener.java | 4 +-
.../java/org/apache/gobblin/util/AvroUtils.java | 42 +++++++++++++++------
.../org/apache/gobblin/util/AvroFlattenerTest.java | 3 +-
.../org/apache/gobblin/util/AvroUtilsTest.java | 24 ++++++------
.../optionWithinOptionWithinRecord_flattened.json | 9 +++--
.../recordWithinOptionWithinRecord_flattened.json | 6 ++-
gradle/scripts/defaultBuildProperties.gradle | 4 +-
gradle/scripts/dependencyDefinitions.gradle | 14 ++++---
gradle/scripts/repositories.gradle | 4 ++
37 files changed, 161 insertions(+), 83 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
index 2a2ab6b..82c891d 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
@@ -37,6 +37,7 @@ import
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import org.apache.commons.io.FilenameUtils;
import org.apache.gobblin.compaction.dataset.Dataset;
import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
@@ -198,7 +199,8 @@ public class MRCompactorAvroKeyDedupJobRunner extends
MRCompactorJobRunner {
for (Field field : record.getFields()) {
Optional<Schema> newFieldSchema = getKeySchema(field);
if (newFieldSchema.isPresent()) {
- fields.add(new Field(field.name(), newFieldSchema.get(), field.doc(),
field.defaultValue()));
+ fields.add(AvroCompatibilityHelper.createSchemaField(field.name(),
newFieldSchema.get(), field.doc(),
+ AvroUtils.getCompatibleDefaultValue(field)));
}
}
if (!fields.isEmpty()) {
diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle
index fb7a4a7..83d1c9d 100644
--- a/gobblin-core-base/build.gradle
+++ b/gobblin-core-base/build.gradle
@@ -31,6 +31,7 @@ dependencies {
compile externalDependency.avroMapredH2
compile externalDependency.commonsCodec
compile externalDependency.avro
+ compile externalDependency.avroCompatHelper
compile externalDependency.guava
compile externalDependency.slf4j
compile externalDependency.typesafeConfig
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
index d8d227b..be5c613 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
@@ -22,12 +22,15 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.gobblin.util.AvroUtils;
+
import static org.apache.gobblin.util.AvroUtils.convertFieldToSchemaWithProps;
@@ -118,11 +121,13 @@ public class AvroSchemaFieldRemover {
if (!this.shouldRemove(field)) {
Field newField;
if (this.children.containsKey(field.name())) {
- newField = new Field(field.name(),
this.children.get(field.name()).removeFields(field.schema(), schemaMap),
- field.doc(), field.defaultValue());
+ newField = AvroCompatibilityHelper.createSchemaField(field.name(),
+ this.children.get(field.name()).removeFields(field.schema(),
schemaMap),
+ field.doc(), AvroUtils.getCompatibleDefaultValue(field));
} else {
- newField = new Field(field.name(),
DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), field.doc(),
- field.defaultValue());
+ newField = AvroCompatibilityHelper.createSchemaField(field.name(),
+ DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap),
field.doc(),
+ AvroUtils.getCompatibleDefaultValue(field));
}
// Avro 1.9 compatible change - replaced deprecated public api
getJsonProps with getObjectProps
for (Map.Entry<String, Object> objectEntry :
field.getObjectProps().entrySet()) {
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
index bcad4f9..c44e2f5 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
@@ -26,6 +26,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -85,10 +86,11 @@ public class GobblinTrackingEventFlattenFilterConverter
extends AvroToAvroConver
String curFieldName = field.name();
if (!field.schema().getType().equals(Schema.Type.MAP)) {
if (fieldsRenameMap.containsKey(curFieldName)) {
- newFields.add(
- new Schema.Field(fieldsRenameMap.get(curFieldName),
field.schema(), field.doc(), field.defaultValue()));
+
newFields.add(AvroCompatibilityHelper.createSchemaField(fieldsRenameMap.get(curFieldName),
field.schema(),
+ field.doc(), AvroUtils.getCompatibleDefaultValue(field)));
} else {
- newFields.add(new Schema.Field(curFieldName, field.schema(),
field.doc(), field.defaultValue()));
+
newFields.add(AvroCompatibilityHelper.createSchemaField(curFieldName,
field.schema(), field.doc(),
+ AvroUtils.getCompatibleDefaultValue(field)));
}
this.nonMapFields.add(curFieldName);
} else {
@@ -102,7 +104,7 @@ public class GobblinTrackingEventFlattenFilterConverter
extends AvroToAvroConver
for (String fieldToFlatten : ConfigUtils.getStringList(config,
FIELDS_TO_FLATTEN)) {
String newFieldName =
this.fieldsRenameMap.containsKey(fieldToFlatten) ?
this.fieldsRenameMap.get(fieldToFlatten) : fieldToFlatten;
- newFields.add(new Field(newFieldName, Schema.create(Schema.Type.STRING),
"", null));
+ newFields.add(AvroCompatibilityHelper.createSchemaField(newFieldName,
Schema.create(Schema.Type.STRING), "", null));
}
return this;
diff --git
a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
index 669d040..76aadbe 100644
---
a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
+++
b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemoverTest.java
@@ -35,18 +35,30 @@ public class AvroSchemaFieldRemoverTest {
@Test
public void testRemoveFields() throws IllegalArgumentException, IOException {
+ // Avro 1.9.2 uses Object as default value and uses internal
JacksonUtils.toJson to convert the Object default value
+ // to JsonNode. Enum default value is not supported and hence the
conversion fails. Updated the input schema and
+ // removed the non null default value for enum.
Schema convertedSchema1 =
convertSchema("/converter/recursive_schema_1.avsc", "YwchQiH.OjuzrLOtmqLW");
Schema expectedSchema1 =
parseSchema("/converter/recursive_schema_1_converted.avsc");
- Assert.assertEquals(convertedSchema1, expectedSchema1);
+ Assert.assertEquals(convertedSchema1.toString(),
expectedSchema1.toString());
+ // Avro 1.9.2 uses Object as default value and uses internal
JacksonUtils.toJson to convert the Object default value
+ // to JsonNode. Default value of record type is not supported and hence
the conversion fails. Updated the input
+ // schema and removed the non null default value for enum. In earlier Avro
1.8.1 there was no such conversion.
+ // For example, remove the below default value.
+ // "default": {
+ // "AUjmPup": null,
+ // "IFBRClOa": "zG",
+ // "sZIVnwv": null
+ // }
Schema convertedSchema2 =
convertSchema("/converter/recursive_schema_2.avsc",
"FBuKC.wIINqII.lvaerUEKxBQUWg,eFQjDj.TzuYZajb");
Schema expectedSchema2 =
parseSchema("/converter/recursive_schema_2_converted.avsc");
- Assert.assertEquals(convertedSchema2, expectedSchema2);
+ Assert.assertEquals(convertedSchema2.toString(),
expectedSchema2.toString());
Schema convertedSchema3 =
convertSchema("/converter/recursive_schema_2.avsc",
"field.that.does.not.exist");
Schema expectedSchema3 =
parseSchema("/converter/recursive_schema_2_not_converted.avsc");
- Assert.assertEquals(convertedSchema3, expectedSchema3);
+ Assert.assertEquals(convertedSchema3.toString(),
expectedSchema3.toString());
}
private Schema parseSchema(String schemaFile) throws IOException {
diff --git
a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
index 0989988..4c1577d 100644
---
a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
+++
b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverterTest.java
@@ -45,12 +45,13 @@ public class GobblinTrackingEventFlattenFilterConverterTest
{
Schema output = converter.convertSchema(
new
Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")),
workUnitState);
- Assert.assertEquals(output, new Schema.Parser().parse(
+ Schema parsedSchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":"
+ "[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at
which event was created.\",\"default\":0},"
+
"{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace
used for filtering of events.\"},"
+ "{\"name\":\"name\",\"type\":\"string\",\"doc\":\"Event
name.\"},{\"name\":\"field1\",\"type\":\"string\",\"doc\":\"\"},"
- + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}"));
+ + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}");
+ Assert.assertEquals(output.toString(), parsedSchema.toString());
props.put(GobblinTrackingEventFlattenFilterConverter.class.getSimpleName()
+ "."
+ GobblinTrackingEventFlattenFilterConverter.FIELDS_RENAME_MAP,
"name:eventName,field1:field3");
@@ -61,11 +62,12 @@ public class GobblinTrackingEventFlattenFilterConverterTest
{
Schema output2 = converter.convertSchema(
new
Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")),
workUnitState2);
- Assert.assertEquals(output2, new Schema.Parser().parse(
+ parsedSchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":"
+ "[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at
which event was created.\",\"default\":0},"
+
"{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace
used for filtering of events.\"},"
+ "{\"name\":\"eventName\",\"type\":\"string\",\"doc\":\"Event
name.\"},{\"name\":\"field3\",\"type\":\"string\",\"doc\":\"\"},"
- + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}"));
+ + "{\"name\":\"field2\",\"type\":\"string\",\"doc\":\"\"}]}");
+ Assert.assertEquals(output2.toString(), parsedSchema.toString());
}
}
diff --git
a/gobblin-core-base/src/test/resources/converter/recursive_schema_1.avsc
b/gobblin-core-base/src/test/resources/converter/recursive_schema_1.avsc
index 4b41e2d..3b0070a 100644
--- a/gobblin-core-base/src/test/resources/converter/recursive_schema_1.avsc
+++ b/gobblin-core-base/src/test/resources/converter/recursive_schema_1.avsc
@@ -1 +1 @@
-{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj
NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw
tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll
lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj
N [...]
+{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj
NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw
tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll
lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj
N [...]
diff --git
a/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
b/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
index 66a8fae..0b4b245 100644
---
a/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
+++
b/gobblin-core-base/src/test/resources/converter/recursive_schema_1_converted.avsc
@@ -1 +1 @@
-{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj
NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw
tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll
lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj
N [...]
+{"type":"record","name":"VyPswKoukcXEZshQrXnE","namespace":"PKA.POshikUo.flXRgM.aBxSQzgOe","fields":[{"name":"jRqjDF","type":{"type":"record","name":"ZDlOMWcUTCk","namespace":"PKA.POshikUo.flXRgM","fields":[{"name":"GJiZXGQc","type":"int","doc":"Vdj
NXTFcrls GsWTlJ Lw WS oWm ZbYt OxeGXFqHeG oWm gQpsyC NXTFcrls GsWTlJ ISn yqw
tVlMiJQo XECvKHj JUyZ BGwP PMEQNW yqw qfzNChDuzDx GwBGpC jM BGwP Ox r xOrwhzll
lIVyeB","meta":"field_meta"},{"name":"NZUOnQgci","type":["null","string"],"doc":"Vdj
N [...]
diff --git
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2.avsc
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2.avsc
index 5a15eae..c708ecb 100644
--- a/gobblin-core-base/src/test/resources/converter/recursive_schema_2.avsc
+++ b/gobblin-core-base/src/test/resources/converter/recursive_schema_2.avsc
@@ -1 +1 @@
-{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"TzuYZajb","type":"int","doc":"BLD
isOtza xj fS jty qGGt PpaPkENfDJ jty
HTvfxa"},{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu fS jty qGGt
PpaPkENfDJ jty HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD
kTrp fS jty FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty
kdYkvr"},{"name":"jf [...]
+{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"TzuYZajb","type":"int","doc":"BLD
isOtza xj fS jty qGGt PpaPkENfDJ jty
HTvfxa"},{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu fS jty qGGt
PpaPkENfDJ jty HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD
kTrp fS jty FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty
kdYkvr"},{"name":"jf [...]
diff --git
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2_converted.avsc
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2_converted.avsc
index a733019..8cff0df 100644
---
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2_converted.avsc
+++
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2_converted.avsc
@@ -1 +1 @@
-{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu
fS jty qGGt PpaPkENfDJ jty
HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD kTrp fS jty
FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty
kdYkvr"},{"name":"jfqWhtm","type":"string","doc":"BLD name fS jty
jfqWhtm"},{"name":"ORVhoRguhhZ","type":["nu [...]
\ No newline at end of file
+{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu
fS jty qGGt PpaPkENfDJ jty
HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD kTrp fS jty
FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty
kdYkvr"},{"name":"jfqWhtm","type":"string","doc":"BLD name fS jty
jfqWhtm"},{"name":"ORVhoRguhhZ","type":["nu [...]
\ No newline at end of file
diff --git
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2_not_converted.avsc
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2_not_converted.avsc
index c3130f9..81519b3 100644
---
a/gobblin-core-base/src/test/resources/converter/recursive_schema_2_not_converted.avsc
+++
b/gobblin-core-base/src/test/resources/converter/recursive_schema_2_not_converted.avsc
@@ -1 +1 @@
-{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"TzuYZajb","type":"int","doc":"BLD
isOtza xj fS jty qGGt PpaPkENfDJ jty
HTvfxa"},{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu fS jty qGGt
PpaPkENfDJ jty HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD
kTrp fS jty FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty
kdYkvr"},{"name":"jf [...]
\ No newline at end of file
+{"type":"record","name":"XQLFognzBcWX","namespace":"IoV.uIwHpaVy.dUV","fields":[{"name":"eFQjDj","type":["null",{"type":"record","name":"CWlSbaEOPjQ","fields":[{"name":"TzuYZajb","type":"int","doc":"BLD
isOtza xj fS jty qGGt PpaPkENfDJ jty
HTvfxa"},{"name":"zMMEITWsv","type":["null","string"],"doc":"Pnu fS jty qGGt
PpaPkENfDJ jty HTvfxa","default":null},{"name":"kTrp","type":"long","doc":"BLD
kTrp fS jty FBuKC"},{"name":"kdYkvr","type":"string","doc":"BLD name fS jty
kdYkvr"},{"name":"jf [...]
\ No newline at end of file
diff --git a/gobblin-core/build.gradle b/gobblin-core/build.gradle
index 9a0bd26..9ca2467 100644
--- a/gobblin-core/build.gradle
+++ b/gobblin-core/build.gradle
@@ -36,6 +36,7 @@ dependencies {
compile externalDependency.commonsMath
compile externalDependency.commonsHttpClient
compile externalDependency.avro
+ compile externalDependency.avroCompatHelper
compile externalDependency.guava
compile externalDependency.gson
compile externalDependency.slf4j
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
index ccf0819..ec5309b 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/FlattenNestedKeyConverter.java
@@ -25,6 +25,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.google.common.base.CaseFormat;
import com.google.common.base.Optional;
@@ -75,7 +76,8 @@ public class FlattenNestedKeyConverter extends
Converter<Schema, Schema, Generic
List<Field> fields = new ArrayList<>();
// Clone the existing fields
for (Field field : inputSchema.getFields()) {
- fields.add(new Field(field.name(), field.schema(), field.doc(),
field.defaultValue(), field.order()));
+ fields.add(AvroCompatibilityHelper.createSchemaField(field.name(),
field.schema(), field.doc(),
+ AvroUtils.getCompatibleDefaultValue(field), field.order()));
}
// Convert each of nested keys into a top level field
@@ -102,7 +104,8 @@ public class FlattenNestedKeyConverter extends
Converter<Schema, Schema, Generic
Field field = optional.get();
// Make a copy under a new name
- Field copy = new Field(name, field.schema(), field.doc(),
field.defaultValue(), field.order());
+ Field copy = AvroCompatibilityHelper.createSchemaField(name,
field.schema(), field.doc(),
+ AvroUtils.getCompatibleDefaultValue(field), field.order());
fields.add(copy);
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
index 0d6707d..fbd05a6 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java
@@ -35,7 +35,6 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.EmptyIterable;
import org.apache.gobblin.converter.json.JsonSchema;
-import org.codehaus.jackson.node.JsonNodeFactory;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -45,6 +44,7 @@ import org.slf4j.LoggerFactory;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import lombok.extern.java.Log;
import sun.util.calendar.ZoneInfo;
@@ -618,8 +618,10 @@ public class JsonElementConversionFactory {
throw new UnsupportedOperationException(e);
}
- Schema.Field fld = new Schema.Field(map.getColumnName(), fldSchema,
map.getComment(),
- map.isNullable() ? JsonNodeFactory.instance.nullNode() : null);
+ // [Avro 1.9.2 upgrade] No need to pass
JsonNodeFactory.instance.nullNode() if map is nullable.
+ // AvroCompatibilityHelper will take care of this.
+ Schema.Field fld =
AvroCompatibilityHelper.createSchemaField(map.getColumnName(), fldSchema,
map.getComment(),
+ null);
fld.addProp(SOURCE_TYPE, sourceType);
fields.add(fld);
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
index 6f341fe..e1499ab 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java
@@ -26,6 +26,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,14 +145,14 @@ public class AvroFieldsPickConverter extends
AvroToAvroConverterBase {
Preconditions.checkNotNull(innerSrcField, child.val + " does not exist
under " + recordSchema);
if (child.children.isEmpty()) { //Leaf
- newFields.add(
- new Field(innerSrcField.name(), innerSrcField.schema(),
innerSrcField.doc(), innerSrcField.defaultValue()));
+
newFields.add(AvroCompatibilityHelper.createSchemaField(innerSrcField.name(),
innerSrcField.schema(),
+ innerSrcField.doc(),
AvroUtils.getCompatibleDefaultValue(innerSrcField)));
} else {
Schema innerSrcSchema = innerSrcField.schema();
Schema innerDestSchema = createSchemaHelper(innerSrcSchema, child);
//Recurse of schema
- Field innerDestField =
- new Field(innerSrcField.name(), innerDestSchema,
innerSrcField.doc(), innerSrcField.defaultValue());
+ Field innerDestField =
AvroCompatibilityHelper.createSchemaField(innerSrcField.name(), innerDestSchema,
+ innerSrcField.doc(),
AvroUtils.getCompatibleDefaultValue(innerSrcField));
newFields.add(innerDestField);
}
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
index d1244e8..1bff418 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java
@@ -88,7 +88,7 @@ public class AvroFieldsPickConverterTest {
while (expectedDataFileReader.hasNext()) {
GenericRecord expected = expectedDataFileReader.next();
GenericRecord actual = converter.convertRecord(convertedSchema,
srcDataFileReader.next(), workUnitState).iterator().next();
- Assert.assertEquals(actual, expected);
+ Assert.assertEquals(actual.toString(), expected.toString());
}
Assert.assertTrue(!srcDataFileReader.hasNext());
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
index 17e46b0..8987561 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
@@ -139,6 +139,11 @@ public class AvroGenericRecordAccessorTest {
@Test
public void testGetMultiConvertsStrings() throws IOException {
+ // The below error is due to invalid avro data. Type with "null" union
must have "null" first and then
+ // actual type. This is corrected in fieldPickInput.avsc file and
fieldPickInput_arrays.avro
+ // Error: org.apache.avro.AvroTypeException: Invalid default for field
favorite_quotes: null
+ // not a [{"type":"array","items":"string"},"null"]
+ // Correct data: "type": ["null", { "type": "array", "items": "string"}]
updateRecordFromTestResource("converter/fieldPickInput",
"converter/fieldPickInput_arrays.avro");
Map<String, Object> ret = accessor.getMultiGeneric("favorite_quotes");
Object val = ret.get("favorite_quotes");
diff --git a/gobblin-core/src/test/resources/converter/complex3.json
b/gobblin-core/src/test/resources/converter/complex3.json
index 6f57998..9872d51 100644
--- a/gobblin-core/src/test/resources/converter/complex3.json
+++ b/gobblin-core/src/test/resources/converter/complex3.json
@@ -13,8 +13,8 @@
"isNullable": true,
"dataType": {
"type": [
- "int",
- "null"
+ "null",
+ "int"
]
}
},
@@ -263,12 +263,12 @@
"name": "id",
"type": [
{
- "type": "int",
- "source.type": "int"
- },
- {
"type": "null",
"source.type": "null"
+ },
+ {
+ "type": "int",
+ "source.type": "int"
}
],
"doc": "System-assigned numeric user ID. Cannot be changed by
the user.",
diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
index 4da3fdf..17e7fda 100644
--- a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
+++ b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
@@ -8,6 +8,6 @@
{"name": "date_of_birth", "type": "long"},
{"name": "last_modified", "type": "long"},
{"name": "created", "type": "long"},
- {"name": "favorite_quotes", "type": [{ "type": "array", "items":
"string"}, "null"], "default": null}
+ {"name": "favorite_quotes", "type": ["null", { "type": "array", "items":
"string"}], "default": null}
]
}
diff --git
a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro
b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro
index c10a607..13130a8 100644
Binary files
a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro and
b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro differ
diff --git a/gobblin-core/src/test/resources/serde/serde.avro
b/gobblin-core/src/test/resources/serde/serde.avro
index fe23a8d..977e380 100644
Binary files a/gobblin-core/src/test/resources/serde/serde.avro and
b/gobblin-core/src/test/resources/serde/serde.avro differ
diff --git a/gobblin-core/src/test/resources/serde/serde.avsc
b/gobblin-core/src/test/resources/serde/serde.avsc
index 470827a..c7f3b55 100644
--- a/gobblin-core/src/test/resources/serde/serde.avsc
+++ b/gobblin-core/src/test/resources/serde/serde.avsc
@@ -3,7 +3,7 @@
"name": "User",
"fields": [
{"name": "name", "type": "string"},
- {"name": "favorite_number", "type": ["int", "null"]},
- {"name": "favorite_color", "type": ["string", "null"]}
+ {"name": "favorite_number", "type": ["null", "int"]},
+ {"name": "favorite_color", "type": ["null", "string"]}
]
}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
index d16766c..6bfc2ba 100644
---
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
+++
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
@@ -97,6 +97,11 @@ public class AvroStringFieldEncryptorConverterTest {
wuState.getJobState().setProp("converter.encrypt.algorithm",
"insecure_shift");
converter.init(wuState);
+ // The below error is due to invalid avro data. Type with "null" union
must have "null" first and then
+ // actual type. This is corrected in fieldPickInput.avsc and
fieldPickInput_arrays.avro
+ // Error: org.apache.avro.AvroTypeException: Invalid default for field
favorite_quotes: null
+ // not a [{"type":"array","items":"string"},"null"]
+ // Correct data: "type": ["null", { "type": "array", "items": "string"}]
GenericRecord inputRecord =
getRecordFromFile(getClass().getClassLoader().getResource("fieldPickInput_arrays.avro").getPath());
GenericArray origValues = (GenericArray)
inputRecord.get("favorite_quotes");
diff --git
a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
index c10a607..13130a8 100644
Binary files
a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
and
b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
differ
diff --git a/gobblin-modules/gobblin-kafka-common/build.gradle
b/gobblin-modules/gobblin-kafka-common/build.gradle
index 00954b9..d9a83e6 100644
--- a/gobblin-modules/gobblin-kafka-common/build.gradle
+++ b/gobblin-modules/gobblin-kafka-common/build.gradle
@@ -27,6 +27,7 @@ dependencies {
compile project(":gobblin-core-base")
compile externalDependency.avro
+ compile externalDependency.avroCompatHelper
compile externalDependency.confluentSchemaRegistryClient
compile externalDependency.commonsCodec
compile externalDependency.commonsHttpClient
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
index 6408a4c..198490a 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
@@ -24,7 +24,9 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.util.AvroUtils;
/**
@@ -70,7 +72,8 @@ public class EnvelopePayloadConverter extends
BaseEnvelopeSchemaConverter<Generi
return createLatestPayloadField(field);
}
// Make a copy of the field to the output schema
- return new Field(field.name(), field.schema(), field.doc(),
field.defaultValue(), field.order());
+ return AvroCompatibilityHelper.createSchemaField(field.name(),
field.schema(), field.doc(),
+ AvroUtils.getCompatibleDefaultValue(field), field.order());
}
/**
@@ -83,7 +86,8 @@ public class EnvelopePayloadConverter extends
BaseEnvelopeSchemaConverter<Generi
throws SchemaConversionException {
try {
Schema payloadSchema = fetchLatestPayloadSchema();
- return new Field(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC,
field.defaultValue(), field.order());
+ return AvroCompatibilityHelper.createSchemaField(field.name(),
payloadSchema, DECORATED_PAYLOAD_DOC,
+ AvroUtils.getCompatibleDefaultValue(field), field.order());
} catch (Exception e) {
throw new SchemaConversionException(e);
}
diff --git
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
index c22062c..71629af 100644
---
a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
+++
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/PoliciesResourceTest.java
@@ -38,7 +38,7 @@ public class PoliciesResourceTest {
ThrottlingPolicyFactory factory = new ThrottlingPolicyFactory();
SharedLimiterKey res1key = new SharedLimiterKey("res1");
- Map<String, String> configMap =
avro.shaded.com.google.common.collect.ImmutableMap.<String, String>builder()
+ Map<String, String> configMap =
com.google.common.collect.ImmutableMap.<String, String>builder()
.put(BrokerConfigurationKeyGenerator.generateKey(factory, res1key,
null, ThrottlingPolicyFactory.POLICY_KEY),
CountBasedPolicy.FACTORY_ALIAS)
.put(BrokerConfigurationKeyGenerator.generateKey(factory, res1key,
null, CountBasedPolicy.COUNT_KEY), "100")
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index 78f87c5..17fb469 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -30,6 +30,8 @@ dependencies {
compile externalDependency.guava
compile externalDependency.slf4j
compile externalDependency.avro
+ compile externalDependency.avroCompiler
+ compile externalDependency.avroCompatHelper
compile externalDependency.hiveMetastore
compile externalDependency.jodaTime
compile externalDependency.jacksonCore
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
index 7a15357..8d1a295 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
@@ -411,7 +412,8 @@ public class AvroFlattener {
}
}
}
- Schema.Field field = new Schema.Field(flattenName,
flattenedFieldSchema, f.doc(), f.defaultValue(), f.order());
+ Schema.Field field =
AvroCompatibilityHelper.createSchemaField(flattenName, flattenedFieldSchema,
f.doc(),
+ AvroUtils.getCompatibleDefaultValue(f), f.order());
if (StringUtils.isNotBlank(flattenSource)) {
field.addProp(FLATTENED_SOURCE_KEY, flattenSource);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index d976ea7..e657b37 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -51,6 +51,7 @@ import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.util.Utf8;
+import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
@@ -77,6 +78,7 @@ import com.google.common.collect.Maps;
import com.google.common.io.Closer;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
@@ -134,8 +136,9 @@ public class AvroUtils {
public static List<Field> deepCopySchemaFields(Schema readerSchema) {
return readerSchema.getFields().stream()
.map(field -> {
- Field f = new Field(field.name(), field.schema(), field.doc(),
field.defaultValue(), field.order());
- field.getProps().forEach((key, value) -> f.addProp(key, value));
+ Field f = AvroCompatibilityHelper.createSchemaField(field.name(),
field.schema(), field.doc(),
+ getCompatibleDefaultValue(field), field.order());
+ field.getObjectProps().forEach((key, value) -> f.addProp(key,
value));
return f;
})
.collect(Collectors.toList());
@@ -604,7 +607,8 @@ public class AvroUtils {
List<Field> combinedFields = Lists.newArrayList();
for (Field newFld : newSchema.getFields()) {
- combinedFields.add(new Field(newFld.name(), newFld.schema(),
newFld.doc(), newFld.defaultValue()));
+
combinedFields.add(AvroCompatibilityHelper.createSchemaField(newFld.name(),
newFld.schema(), newFld.doc(),
+ getCompatibleDefaultValue(newFld)));
}
for (Field oldFld : oldSchema.getFields()) {
@@ -619,12 +623,15 @@ public class AvroUtils {
}
}
Schema newFldSchema = Schema.createUnion(union);
- combinedFields.add(new Field(oldFld.name(), newFldSchema,
oldFld.doc(), oldFld.defaultValue()));
+
combinedFields.add(AvroCompatibilityHelper.createSchemaField(oldFld.name(),
newFldSchema, oldFld.doc(),
+ getCompatibleDefaultValue(oldFld)));
} else {
union.add(Schema.create(Type.NULL));
union.add(oldFldSchema);
Schema newFldSchema = Schema.createUnion(union);
- combinedFields.add(new Field(oldFld.name(), newFldSchema,
oldFld.doc(), oldFld.defaultValue()));
+ Object obj = getCompatibleDefaultValue(oldFld);
+
combinedFields.add(AvroCompatibilityHelper.createSchemaField(oldFld.name(),
newFldSchema, oldFld.doc(),
+ getCompatibleDefaultValue(oldFld)));
}
}
}
@@ -673,7 +680,8 @@ public class AvroUtils {
for (Field field : record.getFields()) {
Optional<Schema> newFieldSchema =
removeUncomparableFields(field.schema(), processed);
if (newFieldSchema.isPresent()) {
- fields.add(new Field(field.name(), newFieldSchema.get(), field.doc(),
field.defaultValue()));
+ fields.add(AvroCompatibilityHelper.createSchemaField(field.name(),
newFieldSchema.get(), field.doc(),
+ getCompatibleDefaultValue(field)));
}
}
@@ -733,7 +741,8 @@ public class AvroUtils {
if (null == input) {
return null;
}
- Field field = new Field(input.name(), input.schema(), input.doc(),
input.defaultValue(), input.order());
+ Field field = AvroCompatibilityHelper.createSchemaField(input.name(),
input.schema(), input.doc(),
+ getCompatibleDefaultValue(input), input.order());
return field;
}
});
@@ -776,8 +785,8 @@ public class AvroUtils {
List<Schema.Field> newFields = new ArrayList<>();
if (schema.getFields().size() > 0) {
for (Schema.Field oldField : schema.getFields()) {
- Field newField = new Field(oldField.name(),
switchNamespace(oldField.schema(), namespaceOverride), oldField.doc(),
- oldField.defaultValue(), oldField.order());
+ Field newField =
AvroCompatibilityHelper.createSchemaField(oldField.name(),
switchNamespace(oldField.schema(),
+ namespaceOverride), oldField.doc(),
getCompatibleDefaultValue(oldField), oldField.order());
// Copy field level properties
copyFieldProperties(oldField, newField);
newFields.add(newField);
@@ -1088,8 +1097,8 @@ public class AvroUtils {
Schema copiedFieldSchema = dropRecursive(fieldSchemaEntry,
newParents, fieldsWithRecursion);
if (copiedFieldSchema == null) {
} else {
- Schema.Field copiedField =
- new Schema.Field(field.name(), copiedFieldSchema, field.doc(),
field.defaultValue(), field.order());
+ Schema.Field copiedField =
AvroCompatibilityHelper.createSchemaField(field.name(), copiedFieldSchema,
+ field.doc(), getCompatibleDefaultValue(field), field.order());
copyFieldProperties(field, copiedField);
copiedSchemaFields.add(copiedField);
}
@@ -1142,7 +1151,16 @@ public class AvroUtils {
* @param copiedField
*/
private static void copyFieldProperties(Schema.Field sourceField,
Schema.Field copiedField) {
- sourceField.getProps().forEach((key, value) -> copiedField.addProp(key,
value));
+ sourceField.getObjectProps().forEach((key, value) ->
copiedField.addProp(key, value));
}
+ @Nullable
+ public static Object getCompatibleDefaultValue(Schema.Field field) {
+ return AvroCompatibilityHelper.fieldHasDefault(field)
+ ? AvroCompatibilityHelper.getGenericDefaultValue(field)
+ : null;
+ }
+
+
+
}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
index 521bda2..c6889da 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroFlattenerTest.java
@@ -114,8 +114,7 @@ public class AvroFlattenerTest {
Schema originalSchema =
readSchemaFromJsonFile("optionWithinOptionWithinRecord_original.json");
Schema expectedSchema =
readSchemaFromJsonFile("optionWithinOptionWithinRecord_flattened.json");
-
- Assert.assertEquals(new AvroFlattener().flatten(originalSchema, false),
expectedSchema);
+ Assert.assertEquals(new AvroFlattener().flatten(originalSchema,
false).toString(), expectedSchema.toString());
}
/**
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 6f18a35..931ac34 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -49,9 +49,9 @@ import org.apache.avro.util.internal.JacksonUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ArrayNode;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -101,7 +101,7 @@ public class AvroUtilsTest {
Schema expectedOutputSchema1 =
new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", "
+ "\"fields\":["
- + "{\"name\": \"name\", \"type\": \"string\"}, " + "{\"name\":
\"number\", \"type\": [\"null\", \"int\"]}"
+ + "{\"name\": \"name\", \"type\": \"string\"}, " + "{\"name\":
\"number\", \"type\": [\"null\", \"int\"],\"default\":null}]}"
+ "]}");
Assert.assertEquals(expectedOutputSchema1,
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1));
@@ -118,7 +118,7 @@ public class AvroUtilsTest {
Schema expectedOutputSchema2 =
new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", "
+ "\"fields\":["
+ "{\"name\": \"name\", \"type\": \"string\"}, "
- + "{\"name\": \"number\", \"type\": [\"null\", {\"type\":
\"array\", \"items\": \"string\"}]}" + "]}");
+ + "{\"name\": \"number\", \"type\": [\"null\", {\"type\":
\"array\", \"items\": \"string\"}],\"default\":null}]}" + "]}");
Assert.assertEquals(expectedOutputSchema2,
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2));
}
@@ -146,10 +146,10 @@ public class AvroUtilsTest {
.parse("{\"type\":\"record\", \"name\":\"test\", "
+ "\"fields\":["
+ "{\"name\": \"name\", \"type\": \"string\"}, "
- + "{\"name\": \"number\", \"type\": [\"null\", {\"type\":
\"string\"}, {\"type\": \"array\", \"items\": \"string\"}]}"
+ + "{\"name\": \"number\", \"type\": [\"null\", {\"type\":
\"string\"}, {\"type\": \"array\", \"items\": \"string\"}], \"default\":
null}]}"
+ "]}");
- Assert.assertEquals(expectedOutputSchema1,
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1));
+ Assert.assertEquals(expectedOutputSchema1.toString(),
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema1, newSchema1).toString());
Schema oldSchema2 =
new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", "
+ "\"fields\":["
@@ -163,9 +163,9 @@ public class AvroUtilsTest {
Schema expectedOutputSchema2 =
new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"test\", "
+ "\"fields\":["
+ "{\"name\": \"name\", \"type\": \"string\"}, "
- + "{\"name\": \"number\", \"type\": [\"null\", {\"type\":
\"array\", \"items\": \"string\"}]}" + "]}");
+ + "{\"name\": \"number\", \"type\": [\"null\", {\"type\":
\"array\", \"items\": \"string\"}], \"default\": null}" + "]}");
- Assert.assertEquals(expectedOutputSchema2,
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2));
+ Assert.assertEquals(expectedOutputSchema2.toString(),
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema2, newSchema2).toString());
}
/**
@@ -190,10 +190,10 @@ public class AvroUtilsTest {
.parse("{\"type\":\"record\", \"name\":\"test\", "
+ "\"fields\":["
+ "{\"name\": \"name\", \"type\": \"string\"}, "
- + "{\"name\": \"color\", \"type\": [\"null\", \"string\"]}, "
- + "{\"name\": \"number\", \"type\": [\"null\", {\"type\":
\"string\"}, {\"type\": \"array\", \"items\": \"string\"}]}"
+ + "{\"name\": \"color\", \"type\": [\"null\", \"string\"],
\"default\": null}, "
+ + "{\"name\": \"number\", \"type\": [\"null\", {\"type\":
\"string\"}, {\"type\": \"array\", \"items\": \"string\"}], \"default\":
null}]}"
+ "]}");
- Assert.assertEquals(expectedOutputSchema,
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema, newSchema));
+ Assert.assertEquals(expectedOutputSchema.toString(),
AvroUtils.nullifyFieldsForSchemaMerge(oldSchema, newSchema).toString());
}
/**
diff --git
a/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
b/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
index dc4e37d..c2dd3a6 100644
---
a/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
+++
b/gobblin-utility/src/test/resources/flattenAvro/optionWithinOptionWithinRecord_flattened.json
@@ -4,15 +4,18 @@
"fields" : [ {
"name" :
"parentFieldUnion__unionRecordMemberFieldUnion__superNestedFieldString1",
"type" : [ "null", "string" ],
- "flatten_source" :
"parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString1"
+ "flatten_source" :
"parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString1",
+ "default": null
}, {
"name" :
"parentFieldUnion__unionRecordMemberFieldUnion__superNestedFieldString2",
"type" : [ "null", "string" ],
- "flatten_source" :
"parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString2"
+ "flatten_source" :
"parentFieldUnion.unionRecordMemberFieldUnion.superNestedFieldString2",
+ "default": null
}, {
"name" : "parentFieldUnion__unionRecordMemberFieldString",
"type" : [ "null", "string" ],
- "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString"
+ "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString",
+ "default": null
}, {
"name" : "parentFieldInt",
"type" : "int"
diff --git
a/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
b/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
index f6ca3cf..9b6c8b9 100644
---
a/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
+++
b/gobblin-utility/src/test/resources/flattenAvro/recordWithinOptionWithinRecord_flattened.json
@@ -4,11 +4,13 @@
"fields" : [ {
"name" : "parentFieldUnion__unionRecordMemberFieldLong",
"type" : [ "null", "long" ],
- "flatten_source" : "parentFieldUnion.unionRecordMemberFieldLong"
+ "flatten_source" : "parentFieldUnion.unionRecordMemberFieldLong",
+ "default": null
}, {
"name" : "parentFieldUnion__unionRecordMemberFieldString",
"type" : [ "null", "string" ],
- "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString"
+ "flatten_source" : "parentFieldUnion.unionRecordMemberFieldString",
+ "default": null
}, {
"name" : "parentFieldInt",
"type" : "int"
diff --git a/gradle/scripts/defaultBuildProperties.gradle
b/gradle/scripts/defaultBuildProperties.gradle
index fec6ef9..3d8ce31 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -23,14 +23,14 @@ def BuildProperties BUILD_PROPERTIES = new
BuildProperties(project)
.register(new BuildProperty("sonatypeArtifactSnapshotRepository",
"https://oss.sonatype.org/content/repositories/snapshots/", "Maven repository
to publish artifacts"))
.register(new BuildProperty("nexusArtifactRepository",
"https://repository.apache.org/service/local/staging/deploy/maven2", "Maven
repository to publish artifacts"))
.register(new BuildProperty("nexusArtifactSnapshotRepository",
"https://repository.apache.org/content/repositories/snapshots", "Maven
repository to publish artifacts"))
- .register(new BuildProperty("avroVersion", "1.8.1", "Avro dependencies
version"))
+ .register(new BuildProperty("avroVersion", "1.9.2", "Avro dependencies
version"))
.register(new BuildProperty("awsVersion", "1.11.8", "AWS dependencies
version"))
.register(new BuildProperty("bytemanVersion", "4.0.5", "Byteman
dependencies version"))
.register(new BuildProperty("confluentVersion", "2.0.1", "confluent
dependencies version"))
.register(new BuildProperty("doNotSignArtifacts", false, "Do not sight
Maven artifacts"))
.register(new BuildProperty("gobblinFlavor", "standard", "Build flavor
(see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
.register(new BuildProperty("hadoopVersion", "2.3.0", "Hadoop dependencies
version"))
- .register(new BuildProperty("hiveVersion", "1.0.1", "Hive dependencies
version"))
+ .register(new BuildProperty("hiveVersion", "1.0.1-avro", "Hive
dependencies version"))
.register(new BuildProperty("icebergVersion", "0.10.0", "Iceberg
dependencies version"))
.register(new BuildProperty("jdkVersion",
JavaVersion.VERSION_1_8.toString(),
"Java languange compatibility; supported versions: " +
JavaVersion.VERSION_1_8))
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 670a0d0..b8ac319 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -25,6 +25,8 @@ ext.externalDependency = [
"antlrRuntime": "org.antlr:antlr-runtime:3.5.2",
"avro": "org.apache.avro:avro:" + avroVersion,
"avroMapredH2": "org.apache.avro:avro-mapred:" + avroVersion,
+ "avroCompatHelper": "com.linkedin.avroutil1:helper-all:0.2.71",
+ "avroCompiler": "org.apache.avro:avro-compiler:" + avroVersion,
"awsCore": "com.amazonaws:aws-java-sdk-core:" + awsVersion,
"awsAsg": "com.amazonaws:aws-java-sdk-autoscaling:" + awsVersion,
"awsAppAsg": "com.amazonaws:aws-java-sdk-applicationautoscaling:" +
awsVersion,
@@ -66,12 +68,12 @@ ext.externalDependency = [
"hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
"hdrHistogram": "org.hdrhistogram:HdrHistogram:2.1.11",
"helix": "org.apache.helix:helix-core:0.9.4",
- "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
- "hiveService": "org.apache.hive:hive-service:" + hiveVersion,
- "hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,
- "hiveMetastore": "org.apache.hive:hive-metastore:" + hiveVersion,
- "hiveExec": "org.apache.hive:hive-exec:" + hiveVersion + ":core",
- "hiveSerDe": "org.apache.hive:hive-serde:" + hiveVersion,
+ "hiveCommon": "com.linkedin.hive:hive-common:" + hiveVersion,
+ "hiveService": "com.linkedin.hive:hive-service:" + hiveVersion,
+ "hiveJdbc": "com.linkedin.hive:hive-jdbc:" + hiveVersion,
+ "hiveMetastore": "com.linkedin.hive:hive-metastore:" + hiveVersion,
+ "hiveExec": "com.linkedin.hive:hive-exec:" + hiveVersion + ":core",
+ "hiveSerDe": "com.linkedin.hive:hive-serde:" + hiveVersion,
"hiveStorageApi": "org.apache.hive:hive-storage-api:2.4.0",
"httpclient": "org.apache.httpcomponents:httpclient:4.5.2",
"httpmime": "org.apache.httpcomponents:httpmime:4.5.2",
diff --git a/gradle/scripts/repositories.gradle
b/gradle/scripts/repositories.gradle
index 363c0b1..096df1b 100644
--- a/gradle/scripts/repositories.gradle
+++ b/gradle/scripts/repositories.gradle
@@ -29,7 +29,11 @@ repositories {
maven {
url "https://linkedin.jfrog.io/artifactory/open-source/"
}
+ maven {
+ url "https://linkedin.jfrog.io/artifactory/gobblin-hive/"
+ }
jcenter()
+ mavenLocal()
}
try {