This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8a9b5d0 [GOBBLIN-763] Support fields removal for compaction dedup key
schema
8a9b5d0 is described below
commit 8a9b5d0c803a95b4bfb9ac3135ee4e849ccdb27b
Author: zhchen <[email protected]>
AuthorDate: Wed May 8 09:49:53 2019 -0700
[GOBBLIN-763] Support fields removal for compaction dedup key schema
Closes #2627 from zxcware/compact
---
.../mapreduce/CompactionAvroJobConfigurator.java | 16 +-
.../avro/MRCompactorAvroKeyDedupJobRunner.java | 1 +
.../mapreduce/CompactionJobConfiguratorTest.java | 51 +++
.../test/resources/dedup-schema/key-schema.avsc | 375 +++++++++++++++++++++
.../java/org/apache/gobblin/util/AvroUtils.java | 34 +-
5 files changed, 463 insertions(+), 14 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index 65fdce5..bee1e9c 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.compaction.mapreduce;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
import java.io.IOException;
@@ -31,6 +32,7 @@ import
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
import
org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
import
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.converter.filter.AvroSchemaFieldRemover;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.util.AvroUtils;
@@ -47,6 +49,8 @@ import org.apache.hadoop.mapreduce.Job;
@Slf4j
public class CompactionAvroJobConfigurator extends CompactionJobConfigurator {
+ private Optional<String> keyFieldBlacklist;
+
public static class Factory implements
CompactionJobConfigurator.ConfiguratorFactory {
@Override
public CompactionJobConfigurator createConfigurator(State state) throws
IOException {
@@ -65,6 +69,8 @@ public class CompactionAvroJobConfigurator extends
CompactionJobConfigurator {
*/
public CompactionAvroJobConfigurator(State state) throws IOException {
super(state);
+ keyFieldBlacklist =
+
Optional.fromNullable(state.getProp(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_KEY_FIELD_BLACKLIST));
}
/**
@@ -83,7 +89,8 @@ public class CompactionAvroJobConfigurator extends
CompactionJobConfigurator {
/**
* Refer to MRCompactorAvroKeyDedupJobRunner#getKeySchema(Job, Schema)
*/
- private Schema getDedupKeySchema(Schema topicSchema) {
+ @VisibleForTesting
+ Schema getDedupKeySchema(Schema topicSchema) {
boolean keySchemaFileSpecified =
this.state.contains(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
@@ -121,6 +128,13 @@ public class CompactionAvroJobConfigurator extends
CompactionJobConfigurator {
keySchema =
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
}
+
+ if (keyFieldBlacklist.isPresent()) {
+ AvroSchemaFieldRemover fieldRemover = new
AvroSchemaFieldRemover(keyFieldBlacklist.get());
+ keySchema = fieldRemover.removeFields(keySchema);
+ log.info("Adjusted key schema {}", keySchema);
+ }
+
return keySchema;
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
index 35a3f82..2a2ab6b 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/MRCompactorAvroKeyDedupJobRunner.java
@@ -77,6 +77,7 @@ public class MRCompactorAvroKeyDedupJobRunner extends
MRCompactorJobRunner {
*/
public static final String COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC =
COMPACTION_JOB_PREFIX + "avro.key.schema.loc";
public static final String COMPACTION_JOB_DEDUP_KEY = COMPACTION_JOB_PREFIX
+ "dedup.key";
+ public static final String COMPACTION_JOB_KEY_FIELD_BLACKLIST =
COMPACTION_JOB_PREFIX + "key.fieldBlacklist";
private static final String AVRO = "avro";
private static final String SCHEMA_DEDUP_FIELD_ANNOTATOR = "primarykey";
diff --git
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfiguratorTest.java
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfiguratorTest.java
new file mode 100644
index 0000000..f5f2a83
--- /dev/null
+++
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfiguratorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.avro.Schema;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.AvroUtils;
+
+
+public class CompactionJobConfiguratorTest {
+
+ @Test
+ public void testKeyFieldBlacklist() throws IOException {
+ State state = new State();
+ state.setProp("compaction.job.key.fieldBlacklist",
"auditHeader,value.lumos_dropdate,value.__ETL_SCN");
+ state.setProp("compaction.job.dedup.key", "ALL");
+
+ CompactionAvroJobConfigurator configurator = new
CompactionAvroJobConfigurator(state);
+ try (InputStream keyschema =
getClass().getClassLoader().getResourceAsStream("dedup-schema/key-schema.avsc"))
{
+ Schema topicSchema = new Schema.Parser().parse(keyschema);
+ Schema actualKeySchema = configurator.getDedupKeySchema(topicSchema);
+ Assert.assertEquals(actualKeySchema.getFields().size(), 2);
+
Assert.assertEquals(actualKeySchema.getField("value").schema().getFields().size(),
57);
+ Assert.assertFalse(AvroUtils.getFieldSchema(actualKeySchema,
"auditheader").isPresent());
+ Assert.assertFalse(AvroUtils.getFieldSchema(actualKeySchema,
"value.lumos_dropdate").isPresent());
+ Assert.assertFalse(AvroUtils.getFieldSchema(actualKeySchema,
"value.__ETL_SCN").isPresent());
+
+ }
+ }
+}
diff --git a/gobblin-compaction/src/test/resources/dedup-schema/key-schema.avsc
b/gobblin-compaction/src/test/resources/dedup-schema/key-schema.avsc
new file mode 100644
index 0000000..38f3f4e
--- /dev/null
+++ b/gobblin-compaction/src/test/resources/dedup-schema/key-schema.avsc
@@ -0,0 +1,375 @@
+{
+ "type" : "record",
+ "name" : "ORDERS2_etl",
+ "namespace" : "OMS_etl",
+ "fields" : [ {
+ "name" : "metadata",
+ "type" : {
+ "type" : "map",
+ "values" : "string"
+ },
+ "doc" : "Event metadata.",
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":null}"
+ }, {
+ "name" : "key",
+ "type" : "bytes",
+ "doc" : "serialized key.",
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":null}"
+ }, {
+ "name" : "value",
+ "type" : {
+ "type" : "record",
+ "name" : "ORDERS2",
+ "namespace" : "OMS",
+ "doc" : "Auto-generated Avro schema for ORDERS2. Generated at Feb 15,
2019 10:17:30 PM UTC",
+ "fields" : [ {
+ "name" : "ORDER_ID",
+ "type" : "long",
+ "default" : 0,
+ "attributes_json" : "{\"delta\":false,\"pk\":true,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "DATE_PLACED",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"DATE\"}"
+ }, {
+ "name" : "INVOICE_STATUS",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+ }, {
+ "name" : "MEMBER_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "PAYPAL_TX_ID",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NVARCHAR2\"}"
+ }, {
+ "name" : "TX_TYPE",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+ }, {
+ "name" : "TX_AMOUNT",
+ "type" : [ "null", "double" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "TX_CURRENCY",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+ }, {
+ "name" : "SALE_TX_ID",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NVARCHAR2\"}"
+ }, {
+ "name" : "ORDER_STATE",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+ }, {
+ "name" : "REFUND_DATE",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"DATE\"}"
+ }, {
+ "name" : "UPG_ORIG_ORDER_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "UPG_CREDIT_AMT",
+ "type" : [ "null", "double" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "UPG_CREDIT_CUR",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+ }, {
+ "name" : "ORDER_INFO_VER",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "ACCOUNT_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "LOCALE",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "LAST_MODIFIED",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":true,\"pk\":false,\"type\":\"DATE\"}"
+ }, {
+ "name" : "DATE_CREATED",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"DATE\"}"
+ }, {
+ "name" : "CREATED_BY",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "UPDATED_BY",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "UPDATE_COUNT",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "ORDER_TYPE",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+ }, {
+ "name" : "REF_ORDER_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "GLOBAL_TX_ID",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "BUYER_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "BUYER_TYPE",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "VALIDATE_COUNT",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "TAX_AMOUNT",
+ "type" : [ "null", "double" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "TAX_RATE",
+ "type" : [ "null", "double" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "TAX_TYPE",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "TAX_LOG_ENTRY_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "CONTACT_INFO_HASH",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "CUSTOMER_ACCOUNT_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "TAX_ID",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "OWNER_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "OWNER_TYPE",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "RESOLVED_BY_CS",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+ }, {
+ "name" : "CS_COMMENTS",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "USD_EXCHANGE_RATE_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "TAX_EXEMPTION_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "EXEMPTION_DOCUMENT",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "PAYMENT_ACCOUNT_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "GG_MODI_TS",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"TIMESTAMP\"}"
+ }, {
+ "name" : "GG_STATUS",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "ORIGIN_DATA_CENTER",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "ATTRIBUTES",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"XMLTYPE\"}"
+ }, {
+ "name" : "TX_CHANNEL",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "ORDER_SUBTYPE",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "SOURCE_APP",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "SHOW_RECEIPT",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "FULFILLED_IN_DC",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "HOME_CURRENCY",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":\"CHAR\"}"
+ }, {
+ "name" : "HOME_CURRENCY_EXCHANGE_RATE_ID",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "ORDER_NUMBER",
+ "type" : [ "null", "string" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"VARCHAR2\"}"
+ }, {
+ "name" : "DELETED_TS",
+ "type" : [ "null", "long" ],
+ "default" : null,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"TIMESTAMP\"}"
+ }, {
+ "name" : "lumos_dropdate",
+ "type" : "long",
+ "doc" : "",
+ "default" : 0,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\"}"
+ }, {
+ "name" : "__ETL_GG_MODI_MICROS",
+ "type" : "long",
+ "doc" : "A private field reserved only for ETL purpose",
+ "default" : 0,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"MICROS\",\"aux_delta\":1}"
+ }, {
+ "name" : "__ETL_SCN",
+ "type" : "long",
+ "doc" : "A private field reserved only for ETL purpose",
+ "default" : 0,
+ "attributes_json" :
"{\"delta\":false,\"pk\":false,\"type\":\"NUMBER\",\"aux_delta\":2}"
+ } ],
+ "attributes_json" :
"{\"instance\":\"oracle\",\"dumpdate\":\"20190502081051\",\"isFull\":false,\"begin_date\":0,\"end_date\":0,\"total_records\":-1,\"isSharded\":false,\"isSecured\":false,\"permission_group\":\"\",\"datasource_colo\":\"ei-ltx1\",\"table_type\":\"SNAPSHOT_APPEND\"}"
+ },
+ "doc" : "Decorated payload data",
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":null}"
+ }, {
+ "name" : "auditHeader",
+ "type" : [ "null", {
+ "type" : "record",
+ "name" : "KafkaAuditHeader",
+ "namespace" : "com.linkedin.events",
+ "fields" : [ {
+ "name" : "time",
+ "type" : "long",
+ "doc" : "The time at which the event was emitted into kafka."
+ }, {
+ "name" : "server",
+ "type" : "string",
+ "doc" : "The fully qualified name of the host from which the event is
being emitted."
+ }, {
+ "name" : "instance",
+ "type" : [ "null", "string" ],
+ "doc" : "The instance on the server from which the event is being
emitted. e.g. i001"
+ }, {
+ "name" : "appName",
+ "type" : "string",
+ "doc" : "The name of the application from which the event is being
emitted. see go/appname"
+ }, {
+ "name" : "messageId",
+ "type" : {
+ "type" : "fixed",
+ "name" : "UUID",
+ "size" : 16
+ },
+ "doc" : "A unique identifier for the message"
+ }, {
+ "name" : "auditVersion",
+ "type" : [ "null", "int" ],
+ "doc" : "The version that is being used for auditing. In version 0,
the audit trail buckets events into 10 minute audit windows based on the
EventHeader timestamp. In version 1, the audit trail buckets events as follows:
if the schema has an outer KafkaAuditHeader, use the outer audit header
timestamp for bucketing; else if the EventHeader has an inner KafkaAuditHeader
use that inner audit header's timestamp for bucketing",
+ "default" : null
+ }, {
+ "name" : "fabricUrn",
+ "type" : [ "null", "string" ],
+ "doc" : "The fabricUrn of the host from which the event is being
emitted. Fabric Urn in the format of urn:li:fabric:{fabric_name}. See
go/fabric.",
+ "default" : null
+ } ]
+ } ],
+ "doc" : "Header used by kafka for auditing the data in the kafka pipeline",
+ "default" : null,
+ "attributes_json" : "{\"delta\":false,\"pk\":false,\"type\":null}"
+ } ],
+ "attributes_json" :
"{\"instance\":\"oracle\",\"dumpdate\":\"20190502081051\",\"isFull\":false,\"begin_date\":0,\"end_date\":0,\"total_records\":-1,\"isSharded\":false,\"isSecured\":false,\"permission_group\":\"\",\"datasource_colo\":\"ei-ltx1\",\"table_type\":\"SNAPSHOT_APPEND\"}"
+}
\ No newline at end of file
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index fccd46e..43f03f7 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -74,6 +74,7 @@ import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
@@ -607,10 +608,10 @@ public class AvroUtils {
* MapReduce job.
*/
public static Optional<Schema> removeUncomparableFields(Schema schema) {
- return removeUncomparableFields(schema, Sets.<Schema> newHashSet());
+ return removeUncomparableFields(schema, Maps.newHashMap());
}
- private static Optional<Schema> removeUncomparableFields(Schema schema,
Set<Schema> processed) {
+ private static Optional<Schema> removeUncomparableFields(Schema schema,
Map<Schema, Optional<Schema>> processed) {
switch (schema.getType()) {
case RECORD:
return removeUncomparableFieldsFromRecord(schema, processed);
@@ -627,13 +628,13 @@ public class AvroUtils {
}
}
- private static Optional<Schema> removeUncomparableFieldsFromRecord(Schema
record, Set<Schema> processed) {
+ private static Optional<Schema> removeUncomparableFieldsFromRecord(Schema
record, Map<Schema, Optional<Schema>> processed) {
Preconditions.checkArgument(record.getType() == Schema.Type.RECORD);
- if (processed.contains(record)) {
- return Optional.absent();
+ Optional<Schema> result = processed.get(record);
+ if (null != result) {
+ return result;
}
- processed.add(record);
List<Field> fields = Lists.newArrayList();
for (Field field : record.getFields()) {
@@ -645,16 +646,19 @@ public class AvroUtils {
Schema newSchema = Schema.createRecord(record.getName(), record.getDoc(),
record.getNamespace(), false);
newSchema.setFields(fields);
- return Optional.of(newSchema);
+ result = Optional.of(newSchema);
+ processed.put(record, result);
+
+ return result;
}
- private static Optional<Schema> removeUncomparableFieldsFromUnion(Schema
union, Set<Schema> processed) {
+ private static Optional<Schema> removeUncomparableFieldsFromUnion(Schema
union, Map<Schema, Optional<Schema>> processed) {
Preconditions.checkArgument(union.getType() == Schema.Type.UNION);
- if (processed.contains(union)) {
- return Optional.absent();
+ Optional<Schema> result = processed.get(union);
+ if (null != result) {
+ return result;
}
- processed.add(union);
List<Schema> newUnion = Lists.newArrayList();
for (Schema unionType : union.getTypes()) {
@@ -666,9 +670,13 @@ public class AvroUtils {
// Discard the union field if one or more types are removed from the union.
if (newUnion.size() != union.getTypes().size()) {
- return Optional.absent();
+ result = Optional.absent();
+ } else {
+ result = Optional.of(Schema.createUnion(newUnion));
}
- return Optional.of(Schema.createUnion(newUnion));
+ processed.put(union, result);
+
+ return result;
}
/**