This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a9b5d0  [GOBBLIN-763] Support fields removal for compaction dedup key 
schema
8a9b5d0 is described below

commit 8a9b5d0c803a95b4bfb9ac3135ee4e849ccdb27b
Author: zhchen <[email protected]>
AuthorDate: Wed May 8 09:49:53 2019 -0700

    [GOBBLIN-763] Support fields removal for compaction dedup key schema
    
    Closes #2627 from zxcware/compact
---
 .../mapreduce/CompactionAvroJobConfigurator.java   |  16 +-
 .../avro/MRCompactorAvroKeyDedupJobRunner.java     |   1 +
 .../mapreduce/CompactionJobConfiguratorTest.java   |  51 +++
 .../test/resources/dedup-schema/key-schema.avsc    | 375 +++++++++++++++++++++
 .../java/org/apache/gobblin/util/AvroUtils.java    |  34 +-
 5 files changed, 463 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 65fdce5..bee1e9c 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
 import java.io.IOException;
@@ -31,6 +32,7 @@ import 
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
 import 
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
 import 
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.filter.AvroSchemaFieldRemover;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.util.AvroUtils;
@@ -47,6 +49,8 @@ import org.apache.hadoop.mapreduce.Job;
 @Slf4j
 public class CompactionAvroJobConfigurator extends CompactionJobConfigurator {
 
+  private Optional<String> keyFieldBlacklist;
+
   public static class Factory implements 
CompactionJobConfigurator.ConfiguratorFactory {
     @Override
     public CompactionJobConfigurator createConfigurator(State state) throws 
IOException {
@@ -65,6 +69,8 @@ public class CompactionAvroJobConfigurator extends 
CompactionJobConfigurator {
    */
   public CompactionAvroJobConfigurator(State state) throws IOException {
     super(state);
+    keyFieldBlacklist =
+        
Optional.fromNullable(state.getProp(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_KEY_FIELD_BLACKLIST));
   }
 
   /**
@@ -83,7 +89,8 @@ public class CompactionAvroJobConfigurator extends 
CompactionJobConfigurator {
   /**
    * Refer to MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job, Schema)
    */
-  private Schema getDedupKeySchema(Schema topicSchema) {
+  @VisibleForTesting
+  Schema getDedupKeySchema(Schema topicSchema) {
 
     boolean keySchemaFileSpecified =
         
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
@@ -121,6 +128,13 @@ public class CompactionAvroJobConfigurator extends 
CompactionJobConfigurator {
       keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
     }
 
+
+    if (keyFieldBlacklist.isPresent()) {
+      AvroSchemaFieldRemover fieldRemover = new 
AvroSchemaFieldRemover(keyFieldBlacklist.get());
+      keySchema = fieldRemover.removeFields(keySchema);
+      log.info("Adjusted key schema {}", keySchema);
+    }
+
     return keySchema;
   }
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
index 35a3f82..2a2ab6b 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
@@ -77,6 +77,7 @@ public class MRCompactorAvroKeyDedupJobRunner extends 
MRCompactorJobRunner {
    */
   public static final String COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC = 
COMPACTION_JOB_PREFIX + "avro.key.schema.loc";
   public static final String COMPACTION_JOB_DEDUP_KEY = COMPACTION_JOB_PREFIX 
+ "dedup.key";
+  public static final String COMPACTION_JOB_KEY_FIELD_BLACKLIST = 
COMPACTION_JOB_PREFIX + "key.fieldBlacklist";
 
   private static final String AVRO = "avro";
   private static final String SCHEMA_DEDUP_FIELD_ANNOTATOR = "primarykey";
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfiguratorTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfiguratorTest.java
new file mode 100644
index 0000000..f5f2a83
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfiguratorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.AvroUtils;
+
+
+public class CompactionJobConfiguratorTest {
+
+  @Test
+  public void testKeyFieldBlacklist() throws IOException {
+    State state = new State();
+    state.setProp("compaction.job.key.fieldBlacklist", 
"auditHeader,value.lumos_dropdate,value.__ETL_SCN");
+    state.setProp("compaction.job.dedup.key", "ALL");
+
+    CompactionAvroJobConfigurator configurator = new 
CompactionAvroJobConfigurator(state);
+    try (InputStream keyschema = 
getClass().getClassLoader().getResourceAsStream("dedup-schema/key-schema.avsc"))
 {
+      Schema topicSchema = new Schema.Parser().parse(keyschema);
+      Schema actualKeySchema = configurator.getDedupKeySchema(topicSchema);
+      Assert.assertEquals(actualKeySchema.getFields().size(), 2);
+      
Assert.assertEquals(actualKeySchema.getField("value").schema().getFields().size(),
 57);
+      Assert.assertFalse(AvroUtils.getFieldSchema(actualKeySchema, 
"auditheader").isPresent());
+      Assert.assertFalse(AvroUtils.getFieldSchema(actualKeySchema, 
"value.lumos_dropdate").isPresent());
+      Assert.assertFalse(AvroUtils.getFieldSchema(actualKeySchema, 
"value.__ETL_SCN").isPresent());
+
+    }
+  }
+}
diff --git a/gobblin-compaction/src/test/resources/dedup-schema/key-schema.avsc 
b/gobblin-compaction/src/test/resources/dedup-schema/key-schema.avsc
new file mode 100644
index 0000000..38f3f4e
--- /dev/null
+++ b/gobblin-compaction/src/test/resources/dedup-schema/key-schema.avsc
@@ -0,0 +1,375 @@
+{
+  "type" : "record",
+  "name" : "ORDERS2_etl",
+  "namespace" : "OMS_etl",
+  "fields" : [ {
+    "name" : "metadata",
+    "type" : {
+      "type" : "map",
+      "values" : "string"
+    },
+    "doc" : "Event metadata.",
+    "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":null}"
+  }, {
+    "name" : "key",
+    "type" : "bytes",
+    "doc" : "serialized key.",
+    "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":null}"
+  }, {
+    "name" : "value",
+    "type" : {
+      "type" : "record",
+      "name" : "ORDERS2",
+      "namespace" : "OMS",
+      "doc" : "Auto-generated Avro schema for ORDERS2. Generated at Feb 15, 
2019 10:17:30 PM UTC",
+      "fields" : [ {
+        "name" : "ORDER_ID",
+        "type" : "long",
+        "default" : 0,
+        "attributes_json" : "{\"delta\":false,\"pk\":true,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "DATE_PLACED",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"DATE\"}"
+      }, {
+        "name" : "INVOICE_STATUS",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+      }, {
+        "name" : "MEMBER_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "PAYPAL_TX_ID",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NVARCHAR2\"}"
+      }, {
+        "name" : "TX_TYPE",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+      }, {
+        "name" : "TX_AMOUNT",
+        "type" : [ "null", "double" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "TX_CURRENCY",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+      }, {
+        "name" : "SALE_TX_ID",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NVARCHAR2\"}"
+      }, {
+        "name" : "ORDER_STATE",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+      }, {
+        "name" : "REFUND_DATE",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"DATE\"}"
+      }, {
+        "name" : "UPG_ORIG_ORDER_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "UPG_CREDIT_AMT",
+        "type" : [ "null", "double" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "UPG_CREDIT_CUR",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+      }, {
+        "name" : "ORDER_INFO_VER",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "ACCOUNT_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "LOCALE",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "LAST_MODIFIED",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":true,\"pk\":false,\"type\":\"DATE\"}"
+      }, {
+        "name" : "DATE_CREATED",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"DATE\"}"
+      }, {
+        "name" : "CREATED_BY",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "UPDATED_BY",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "UPDATE_COUNT",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "ORDER_TYPE",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+      }, {
+        "name" : "REF_ORDER_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "GLOBAL_TX_ID",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "BUYER_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "BUYER_TYPE",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "VALIDATE_COUNT",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "TAX_AMOUNT",
+        "type" : [ "null", "double" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "TAX_RATE",
+        "type" : [ "null", "double" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "TAX_TYPE",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "TAX_LOG_ENTRY_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "CONTACT_INFO_HASH",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "CUSTOMER_ACCOUNT_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "TAX_ID",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "OWNER_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "OWNER_TYPE",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "RESOLVED_BY_CS",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+      }, {
+        "name" : "CS_COMMENTS",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "USD_EXCHANGE_RATE_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "TAX_EXEMPTION_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "EXEMPTION_DOCUMENT",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "PAYMENT_ACCOUNT_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "GG_MODI_TS",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"TIMESTAMP\"}"
+      }, {
+        "name" : "GG_STATUS",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "ORIGIN_DATA_CENTER",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "ATTRIBUTES",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"XMLTYPE\"}"
+      }, {
+        "name" : "TX_CHANNEL",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "ORDER_SUBTYPE",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "SOURCE_APP",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "SHOW_RECEIPT",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "FULFILLED_IN_DC",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "HOME_CURRENCY",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+      }, {
+        "name" : "HOME_CURRENCY_EXCHANGE_RATE_ID",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "ORDER_NUMBER",
+        "type" : [ "null", "string" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+      }, {
+        "name" : "DELETED_TS",
+        "type" : [ "null", "long" ],
+        "default" : null,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"TIMESTAMP\"}"
+      }, {
+        "name" : "lumos_dropdate",
+        "type" : "long",
+        "doc" : "",
+        "default" : 0,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+      }, {
+        "name" : "__ETL_GG_MODI_MICROS",
+        "type" : "long",
+        "doc" : "A private field reserved only for ETL purpose",
+        "default" : 0,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"MICROS\",\"aux_delta\":1}"
+      }, {
+        "name" : "__ETL_SCN",
+        "type" : "long",
+        "doc" : "A private field reserved only for ETL purpose",
+        "default" : 0,
+        "attributes_json" : 
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\",\"aux_delta\":2}"
+      } ],
+      "attributes_json" : 
"{\"instance\":\"oracle\",\"dumpdate\":\"20190502081051\",\"isFull\":false,\"begin_date\":0,\"end_date\":0,\"total_records\":-1,\"isSharded\":false,\"isSecured\":false,\"permission_group\":\"\",\"datasource_colo\":\"ei-ltx1\",\"table_type\":\"SNAPSHOT_APPEND\"}"
+    },
+    "doc" : "Decorated payload data",
+    "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":null}"
+  }, {
+    "name" : "auditHeader",
+    "type" : [ "null", {
+      "type" : "record",
+      "name" : "KafkaAuditHeader",
+      "namespace" : "com.linkedin.events",
+      "fields" : [ {
+        "name" : "time",
+        "type" : "long",
+        "doc" : "The time at which the event was emitted into kafka."
+      }, {
+        "name" : "server",
+        "type" : "string",
+        "doc" : "The fully qualified name of the host from which the event is 
being emitted."
+      }, {
+        "name" : "instance",
+        "type" : [ "null", "string" ],
+        "doc" : "The instance on the server from which the event is being 
emitted. e.g. i001"
+      }, {
+        "name" : "appName",
+        "type" : "string",
+        "doc" : "The name of the application from which the event is being 
emitted. see go/appname"
+      }, {
+        "name" : "messageId",
+        "type" : {
+          "type" : "fixed",
+          "name" : "UUID",
+          "size" : 16
+        },
+        "doc" : "A unique identifier for the message"
+      }, {
+        "name" : "auditVersion",
+        "type" : [ "null", "int" ],
+        "doc" : "The version that is being used for auditing. In version 0, 
the audit trail buckets events into 10 minute audit windows based on the 
EventHeader timestamp. In version 1, the audit trail buckets events as follows: 
if the schema has an outer KafkaAuditHeader, use the outer audit header 
timestamp for bucketing; else if the EventHeader has an inner KafkaAuditHeader 
use that inner audit header's timestamp for bucketing",
+        "default" : null
+      }, {
+        "name" : "fabricUrn",
+        "type" : [ "null", "string" ],
+        "doc" : "The fabricUrn of the host from which the event is being 
emitted. Fabric Urn in the format of urn:li:fabric:{fabric_name}. See 
go/fabric.",
+        "default" : null
+      } ]
+    } ],
+    "doc" : "Header used by kafka for auditing the data in the kafka pipeline",
+    "default" : null,
+    "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":null}"
+  } ],
+  "attributes_json" : 
"{\"instance\":\"oracle\",\"dumpdate\":\"20190502081051\",\"isFull\":false,\"begin_date\":0,\"end_date\":0,\"total_records\":-1,\"isSharded\":false,\"isSecured\":false,\"permission_group\":\"\",\"datasource_colo\":\"ei-ltx1\",\"table_type\":\"SNAPSHOT_APPEND\"}"
+}
\ No newline at end of file
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index fccd46e..43f03f7 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -74,6 +74,7 @@ import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 
@@ -607,10 +608,10 @@ public class AvroUtils {
    * MapReduce job.
    */
   public static Optional<Schema> removeUncomparableFields(Schema schema) {
-    return removeUncomparableFields(schema, Sets.<Schema> newHashSet());
+    return removeUncomparableFields(schema, Maps.newHashMap());
   }
 
-  private static Optional<Schema> removeUncomparableFields(Schema schema, 
Set<Schema> processed) {
+  private static Optional<Schema> removeUncomparableFields(Schema schema, 
Map<Schema, Optional<Schema>> processed) {
     switch (schema.getType()) {
       case RECORD:
         return removeUncomparableFieldsFromRecord(schema, processed);
@@ -627,13 +628,13 @@ public class AvroUtils {
     }
   }
 
-  private static Optional<Schema> removeUncomparableFieldsFromRecord(Schema 
record, Set<Schema> processed) {
+  private static Optional<Schema> removeUncomparableFieldsFromRecord(Schema 
record, Map<Schema, Optional<Schema>> processed) {
     Preconditions.checkArgument(record.getType() == Schema.Type.RECORD);
 
-    if (processed.contains(record)) {
-      return Optional.absent();
+    Optional<Schema> result = processed.get(record);
+    if (null != result) {
+      return result;
     }
-    processed.add(record);
 
     List<Field> fields = Lists.newArrayList();
     for (Field field : record.getFields()) {
@@ -645,16 +646,19 @@ public class AvroUtils {
 
     Schema newSchema = Schema.createRecord(record.getName(), record.getDoc(), 
record.getNamespace(), false);
     newSchema.setFields(fields);
-    return Optional.of(newSchema);
+    result = Optional.of(newSchema);
+    processed.put(record, result);
+
+    return result;
   }
 
-  private static Optional<Schema> removeUncomparableFieldsFromUnion(Schema 
union, Set<Schema> processed) {
+  private static Optional<Schema> removeUncomparableFieldsFromUnion(Schema 
union, Map<Schema, Optional<Schema>> processed) {
     Preconditions.checkArgument(union.getType() == Schema.Type.UNION);
 
-    if (processed.contains(union)) {
-      return Optional.absent();
+    Optional<Schema> result = processed.get(union);
+    if (null != result) {
+      return result;
     }
-    processed.add(union);
 
     List<Schema> newUnion = Lists.newArrayList();
     for (Schema unionType : union.getTypes()) {
@@ -666,9 +670,13 @@ public class AvroUtils {
 
     // Discard the union field if one or more types are removed from the union.
     if (newUnion.size() != union.getTypes().size()) {
-      return Optional.absent();
+      result = Optional.absent();
+    } else {
+      result = Optional.of(Schema.createUnion(newUnion));
     }
-    return Optional.of(Schema.createUnion(newUnion));
+    processed.put(union, result);
+
+    return result;
   }
 
   /**

Reply via email to