This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new db210ef  [GOBBLIN-686] Enhance schema comparison
db210ef is described below

commit db210eff5a6e6ddc58c966d6486c0d203baf54e2
Author: Arjun <[email protected]>
AuthorDate: Wed Feb 20 14:07:41 2019 -0800

    [GOBBLIN-686] Enhance schema comparison
    
    Closes #2558 from
    arjun4084346/enhanceSchemaComparison
---
 ...GobblinTrackingEventFlattenFilterConverter.java |  3 ++-
 .../gobblin/converter/AvroHttpJoinConverter.java   |  7 ++----
 .../java/org/apache/gobblin/util/AvroUtils.java    | 27 ++++++++++++++++++++++
 .../org/apache/gobblin/util/AvroUtilsTest.java     | 11 +++++++++
 4 files changed, 42 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
index 00c1baa..a1d6b64 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 
@@ -111,7 +112,7 @@ public class GobblinTrackingEventFlattenFilterConverter 
extends AvroToAvroConver
   @Override
   public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit)
       throws SchemaConversionException {
-    
Preconditions.checkArgument(inputSchema.getFields().equals(gobblinTrackingEventSchema.getFields()));
+    
Preconditions.checkArgument(AvroUtils.checkReaderWriterCompatibility(gobblinTrackingEventSchema,
 inputSchema, true));
     Schema outputSchema = Schema
         .createRecord(ConfigUtils.getString(config, NEW_SCHEMA_NAME, 
inputSchema.getName()), inputSchema.getDoc(),
             inputSchema.getNamespace(), inputSchema.isError());
diff --git 
a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/converter/AvroHttpJoinConverter.java
 
b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/converter/AvroHttpJoinConverter.java
index e742316..d89ada1 100644
--- 
a/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/converter/AvroHttpJoinConverter.java
+++ 
b/gobblin-modules/gobblin-http/src/main/java/org/apache/gobblin/converter/AvroHttpJoinConverter.java
@@ -34,6 +34,7 @@ import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.http.HttpOperation;
 import org.apache.gobblin.http.HttpRequestResponseRecord;
 import org.apache.gobblin.http.ResponseStatus;
+import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.utils.HttpUtils;
 
 
@@ -58,11 +59,7 @@ public abstract class AvroHttpJoinConverter<RQ, RP> extends 
AsyncHttpJoinConvert
       throw new SchemaConversionException("input schema is empty");
     }
 
-    List<Schema.Field> fields = Lists.newArrayList();
-    for (Schema.Field field : inputSchema.getFields()) {
-      Schema.Field newField = new Schema.Field(field.name(), field.schema(), 
field.doc(), field.defaultValue(), field.order());
-      fields.add(newField);
-    }
+    List<Schema.Field> fields = AvroUtils.deepCopySchemaFields(inputSchema);
 
     Schema.Field requestResponseField = new 
Schema.Field(HTTP_REQUEST_RESPONSE_FIELD, 
HttpRequestResponseRecord.getClassSchema(), "http output schema contains 
request url and return result", null);
     fields.add(requestResponseField);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 36a88bb..242dcd8 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -28,11 +28,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericData.Record;
@@ -85,6 +87,31 @@ public class AvroUtils {
 
   private static final String AVRO_SUFFIX = ".avro";
 
+  /**
+   * Validates that the provided reader schema can be used to decode avro data 
written with the
+   * provided writer schema.
+   * @param readerSchema schema to check.
+   * @param writerSchema schema to check.
+   * @param ignoreNamespace whether name and namespace should be ignored in 
validation
+   * @return true if validation passes
+   */
+  public static boolean checkReaderWriterCompatibility(Schema readerSchema, 
Schema writerSchema, boolean ignoreNamespace) {
+    if (ignoreNamespace) {
+      List<Schema.Field> fields = deepCopySchemaFields(readerSchema);
+      readerSchema = Schema.createRecord(writerSchema.getName(), 
writerSchema.getDoc(), writerSchema.getNamespace(),
+          readerSchema.isError());
+      readerSchema.setFields(fields);
+    }
+
+    return SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, 
writerSchema).getType().equals(SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE);
+  }
+
+  public static List<Field> deepCopySchemaFields(Schema readerSchema) {
+    return readerSchema.getFields().stream()
+        .map(field -> new Field(field.name(), field.schema(), field.doc(), 
field.defaultValue(), field.order()))
+        .collect(Collectors.toList());
+  }
+
   public static class AvroPathFilter implements PathFilter {
     @Override
     public boolean accept(Path path) {
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 2188a46..627fc60 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -46,6 +46,17 @@ public class AvroUtilsTest {
   private static final String AVRO_DIR = 
"gobblin-utility/src/test/resources/avroDirParent/";
 
   @Test
+  public void testSchemaCompatiability() {
+    Schema readerSchema = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"GobblinTrackingEvent_GaaS2\",\"namespace\":\"gobblin.metrics\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time
 at which event was 
created.\",\"default\":0},{\"name\":\"namespace\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\"},\"null\"],\"doc\":\"Namespace
 used for filtering of 
events.\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\
 [...]
+    Schema writerSchema1 = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time
 at which event was 
created.\",\"default\":0},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace
 used for filtering of 
events.\"},{\"name\":\"name\",\"type\":\"string\",\"doc\":\"Event 
name.\"},{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\" [...]
+    Schema writerSchema2 = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"GobblinTrackingEvent\",\"namespace\":\"org.apache.gobblin.metrics\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time
 at which event was 
created.\",\"default\":0},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace
 used for filtering of 
events.\"},{\"name\":\"name2\",\"type\":\"string\",\"doc\":\"Event 
name.\"},{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\ [...]
+
+    Assert.assertTrue(AvroUtils.checkReaderWriterCompatibility(readerSchema, 
writerSchema1, true));
+    Assert.assertFalse(AvroUtils.checkReaderWriterCompatibility(readerSchema, 
writerSchema1, false));
+    Assert.assertFalse(AvroUtils.checkReaderWriterCompatibility(readerSchema, 
writerSchema2, true));
+  }
+
+  @Test
   public void testGetDirectorySchema() throws IOException {
     Configuration conf = new Configuration();
     conf.set("fs.default.name", "file:///");

Reply via email to