This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new db210ef [GOBBLIN-686] Enhance schema comparison
db210ef is described below
commit db210eff5a6e6ddc58c966d6486c0d203baf54e2
Author: Arjun <[email protected]>
AuthorDate: Wed Feb 20 14:07:41 2019 -0800
[GOBBLIN-686] Enhance schema comparison
Closes #2558 from
arjun4084346/enhanceSchemaComparison
---
...GobblinTrackingEventFlattenFilterConverter.java | 3 ++-
.../gobblin/converter/AvroHttpJoinConverter.java | 7 ++----
.../java/org/apache/gobblin/util/AvroUtils.java | 27 ++++++++++++++++++++++
.../org/apache/gobblin/util/AvroUtilsTest.java | 11 +++++++++
4 files changed, 42 insertions(+), 6 deletions(-)
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
index 00c1baa..a1d6b64 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -111,7 +112,7 @@ public class GobblinTrackingEventFlattenFilterConverter
extends AvroToAvroConver
@Override
public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit)
throws SchemaConversionException {
-
Preconditions.checkArgument(inputSchema.getFields().equals(gobblinTrackingEventSchema.getFields()));
+
Preconditions.checkArgument(AvroUtils.checkReaderWriterCompatibility(gobblinTrackingEventSchema,
inputSchema, true));
Schema outputSchema = Schema
.createRecord(ConfigUtils.getString(config, NEW_SCHEMA_NAME,
inputSchema.getName()), inputSchema.getDoc(),
inputSchema.getNamespace(), inputSchema.isError());
diff --git
a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/converter/AvroHttpJoinConverter.java
b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/converter/AvroHttpJoinConverter.java
index e742316..d89ada1 100644
---
a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/converter/AvroHttpJoinConverter.java
+++
b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/converter/AvroHttpJoinConverter.java
@@ -34,6 +34,7 @@ import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.http.HttpOperation;
import org.apache.gobblin.http.HttpRequestResponseRecord;
import org.apache.gobblin.http.ResponseStatus;
+import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.utils.HttpUtils;
@@ -58,11 +59,7 @@ public abstract class AvroHttpJoinConverter<RQ, RP> extends
AsyncHttpJoinConvert
throw new SchemaConversionException("input schema is empty");
}
- List<Schema.Field> fields = Lists.newArrayList();
- for (Schema.Field field : inputSchema.getFields()) {
- Schema.Field newField = new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultValue(), field.order());
- fields.add(newField);
- }
+ List<Schema.Field> fields = AvroUtils.deepCopySchemaFields(inputSchema);
Schema.Field requestResponseField = new
Schema.Field(HTTP_REQUEST_RESPONSE_FIELD,
HttpRequestResponseRecord.getClassSchema(), "http output schema contains
request url and return result", null);
fields.add(requestResponseField);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 36a88bb..242dcd8 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -28,11 +28,13 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
+import org.apache.avro.SchemaCompatibility;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericData.Record;
@@ -85,6 +87,31 @@ public class AvroUtils {
private static final String AVRO_SUFFIX = ".avro";
+ /**
+ * Validates that the provided reader schema can be used to decode avro data
written with the
+ * provided writer schema.
+ * @param readerSchema schema to check.
+ * @param writerSchema schema to check.
+ * @param ignoreNamespace whether name and namespace should be ignored in
validation
+ * @return true if validation passes
+ */
+ public static boolean checkReaderWriterCompatibility(Schema readerSchema,
Schema writerSchema, boolean ignoreNamespace) {
+ if (ignoreNamespace) {
+ List<Schema.Field> fields = deepCopySchemaFields(readerSchema);
+ readerSchema = Schema.createRecord(writerSchema.getName(),
writerSchema.getDoc(), writerSchema.getNamespace(),
+ readerSchema.isError());
+ readerSchema.setFields(fields);
+ }
+
+ return SchemaCompatibility.checkReaderWriterCompatibility(readerSchema,
writerSchema).getType().equals(SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE);
+ }
+
+ public static List<Field> deepCopySchemaFields(Schema readerSchema) {
+ return readerSchema.getFields().stream()
+ .map(field -> new Field(field.name(), field.schema(), field.doc(),
field.defaultValue(), field.order()))
+ .collect(Collectors.toList());
+ }
+
public static class AvroPathFilter implements PathFilter {
@Override
public boolean accept(Path path) {
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 2188a46..627fc60 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -46,6 +46,17 @@ public class AvroUtilsTest {
private static final String AVRO_DIR =
"gobblin-utility/src/test/resources/avroDirParent/";
@Test
+ public void testSchemaCompatiability() {
+ Schema readerSchema = new
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"GobblinTrackingEvent_GaaS2\",\"namespace\":\"gobblin.metrics\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time
at which event was
created.\",\"default\":0},{\"name\":\"namespace\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\"},\"null\"],\"doc\":\"Namespace
used for filtering of
events.\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\
[...]
+ Schema writerSchema1 = new
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time
at which event was
created.\",\"default\":0},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace
used for filtering of
events.\"},{\"name\":\"name\",\"type\":\"string\",\"doc\":\"Event
name.\"},{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\" [...]
+ Schema writerSchema2 = new
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time
at which event was
created.\",\"default\":0},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace
used for filtering of
events.\"},{\"name\":\"name2\",\"type\":\"string\",\"doc\":\"Event
name.\"},{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\ [...]
+
+ Assert.assertTrue(AvroUtils.checkReaderWriterCompatibility(readerSchema,
writerSchema1, true));
+ Assert.assertFalse(AvroUtils.checkReaderWriterCompatibility(readerSchema,
writerSchema1, false));
+ Assert.assertFalse(AvroUtils.checkReaderWriterCompatibility(readerSchema,
writerSchema2, true));
+ }
+
+ @Test
public void testGetDirectorySchema() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.default.name", "file:///");