Repository: incubator-gobblin Updated Branches: refs/heads/master 42677dc8c -> ce60d2c7c
[GOBBLIN-195] Ability to switch Avro schema namespace switch before registering with Kafka Avro Schema registry Closes #2049 from abti/avro_schema_namespace_switch Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ce60d2c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ce60d2c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ce60d2c7 Branch: refs/heads/master Commit: ce60d2c7c4e3df1ac85b49993d4a3ccfafc40d2c Parents: 42677dc Author: Abhishek Tiwari <[email protected]> Authored: Wed Aug 9 13:03:09 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Aug 9 13:03:09 2017 -0700 ---------------------------------------------------------------------- .../KafkaSchemaRegistryConfigurationKeys.java | 1 + .../kafka/schemareg/LiKafkaSchemaRegistry.java | 11 ++ .../metrics/kafka/KafkaAvroSchemaRegistry.java | 11 ++ .../reporter/util/KafkaAvroReporterUtil.java | 75 +++++++++++++ .../java/org/apache/gobblin/util/AvroUtils.java | 109 +++++++++++++++++++ .../org/apache/gobblin/util/AvroUtilsTest.java | 23 ++++ 6 files changed, 230 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java index d6f023b..9158663 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java @@ -24,4 +24,5 @@ public class KafkaSchemaRegistryConfigurationKeys { public final static String KAFKA_SCHEMA_REGISTRY_CLASS = "kafka.schemaRegistry.class"; public final static String KAFKA_SCHEMA_REGISTRY_URL = "kafka.schemaRegistry.url"; public final static String KAFKA_SCHEMA_REGISTRY_CACHE = "kafka.schemaRegistry.cache"; + public final static String KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE = "kafka.schemaRegistry.overrideNamespace"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java index 2ea7075..e6afbae 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java @@ -18,6 +18,7 @@ package org.apache.gobblin.kafka.schemareg; import java.io.IOException; +import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; @@ -29,9 +30,11 @@ import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -53,6 +56,7 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch private final GenericObjectPool<HttpClient> httpClientPool; private final String url; + private final Optional<Map<String, String>> namespaceOverride; /** * @param props properties should contain property "kafka.schema.registry.url", and optionally @@ -64,6 +68,7 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch String.format("Property %s not provided.", KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL)); this.url = props.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL); + this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props); int objPoolSize = Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, @@ -171,6 +176,12 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch * @throws SchemaRegistryException if registration failed */ public synchronized MD5Digest register(Schema schema) throws SchemaRegistryException { + + // Change namespace if override specified + if (this.namespaceOverride.isPresent()) { + schema = AvroUtils.switchNamespace(schema, this.namespaceOverride.get()); + } + LOG.info("Registering schema " + schema.toString()); PostMethod post = new PostMethod(url); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java index d391ef3..4c155fb 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java @@ -18,6 +18,7 @@ package org.apache.gobblin.metrics.kafka; import java.io.IOException; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -30,9 +31,11 @@ import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -59,6 +62,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema> private final GenericObjectPool<HttpClient> httpClientPool; private final String url; + private final Optional<Map<String, String>> namespaceOverride; /** * @param properties properties should contain property "kafka.schema.registry.url", and optionally @@ -71,6 +75,7 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema> String.format("Property %s not provided.", KAFKA_SCHEMA_REGISTRY_URL)); this.url = props.getProperty(KAFKA_SCHEMA_REGISTRY_URL); + this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props); int objPoolSize = Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, @@ -186,6 +191,12 @@ public class KafkaAvroSchemaRegistry extends KafkaSchemaRegistry<String, Schema> */ @Override public synchronized String register(Schema schema) throws SchemaRegistryException { + + // Change namespace if override specified + if (this.namespaceOverride.isPresent()) { + schema = AvroUtils.switchNamespace(schema, this.namespaceOverride.get()); + } + LOG.info("Registering schema " + schema.toString()); PostMethod post = new PostMethod(url); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java new file mode 100644 index 0000000..1d82921 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.metrics.reporter.util; + +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys; + +import com.google.common.base.Optional; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + + +public class KafkaAvroReporterUtil { + + private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); + private static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults(); + + /*** + * This method extracts Map of namespaces to override in Kafka schema from Config. + * + * Example config: + * kafka.schemaRegistry.overrideNamespace = namespace1:replacement1,namespace2:replacement2 + * + * For the above example, this method will create a Map with values: + * { + * "namespace1" : "replacement1", + * "namespace2" : "replacement2" + * } + * + * @param properties Properties properties. + * @return Map of namespace overrides. + */ + public static Optional<Map<String, String>> extractOverrideNamespace(Properties properties) { + if (properties.containsKey(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE)) { + + Map<String, String> namespaceOverridesMap = Maps.newHashMap(); + List<String> namespaceOverrides = Lists.newArrayList(SPLIT_BY_COMMA.split(properties + .getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE))); + + for (String namespaceOverride : namespaceOverrides) { + List<String> override = Lists.newArrayList(SPLIT_BY_COLON.split(namespaceOverride)); + if (override.size() != 2) { + throw new RuntimeException("Namespace override should be of the format originalNamespace:replacementNamespace," + + " found: " + namespaceOverride); + } + namespaceOverridesMap.put(override.get(0), override.get(1)); + } + + // If no entry found in the config value, mark it absent + if (namespaceOverridesMap.size() != 0) { + return Optional.of(namespaceOverridesMap); + } + } + + return Optional.<Map<String, String>>absent(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java index 694a40b..d09a6d9 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java @@ -49,6 +49,7 @@ import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.mapred.FsInput; import org.apache.avro.util.Utf8; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -56,6 +57,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.codehaus.jackson.JsonNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -612,6 +614,113 @@ public class AvroUtils { } /** + * Copies the input {@link org.apache.avro.Schema} but changes the schema namespace. + * @param schema {@link org.apache.avro.Schema} to copy. + * @param namespaceOverride namespace for the copied {@link org.apache.avro.Schema}. + * @return A {@link org.apache.avro.Schema} that is a copy of schema, but has the new namespace. + */ + public static Schema switchNamespace(Schema schema, Map<String, String> namespaceOverride) { + Schema newSchema; + String newNamespace = StringUtils.EMPTY; + + // Process all Schema Types + // (Primitives are simply cloned) + switch (schema.getType()) { + case ENUM: + newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? namespaceOverride.get(schema.getNamespace()) + : schema.getNamespace(); + newSchema = + Schema.createEnum(schema.getName(), schema.getDoc(), newNamespace, schema.getEnumSymbols()); + break; + case FIXED: + newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? namespaceOverride.get(schema.getNamespace()) + : schema.getNamespace(); + newSchema = + Schema.createFixed(schema.getName(), schema.getDoc(), newNamespace, schema.getFixedSize()); + break; + case MAP: + newSchema = Schema.createMap(switchNamespace(schema.getValueType(), namespaceOverride)); + break; + case RECORD: + newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? namespaceOverride.get(schema.getNamespace()) + : schema.getNamespace(); + List<Schema.Field> newFields = new ArrayList<>(); + if (schema.getFields().size() > 0) { + for (Schema.Field oldField : schema.getFields()) { + Field newField = new Field(oldField.name(), switchNamespace(oldField.schema(), namespaceOverride), oldField.doc(), + oldField.defaultValue(), oldField.order()); + newFields.add(newField); + } + } + newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), newNamespace, + schema.isError()); + newSchema.setFields(newFields); + break; + case UNION: + List<Schema> newUnionMembers = new ArrayList<>(); + if (null != schema.getTypes() && schema.getTypes().size() > 0) { + for (Schema oldUnionMember : schema.getTypes()) { + newUnionMembers.add(switchNamespace(oldUnionMember, namespaceOverride)); + } + } + newSchema = Schema.createUnion(newUnionMembers); + break; + case ARRAY: + newSchema = Schema.createArray(switchNamespace(schema.getElementType(), namespaceOverride)); + break; + case BOOLEAN: + case BYTES: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case NULL: + case STRING: + newSchema = Schema.create(schema.getType()); + break; + default: + String exceptionMessage = String.format("Schema namespace replacement failed for \"%s\" ", schema); + LOG.error(exceptionMessage); + + throw new AvroRuntimeException(exceptionMessage); + } + + // Copy schema metadata + copyProperties(schema, newSchema); + + return newSchema; + } + + /*** + * Copy properties from old Avro Schema to new Avro Schema + * @param oldSchema Old Avro Schema to copy properties from + * @param newSchema New Avro Schema to copy properties to + */ + private static void copyProperties(Schema oldSchema, Schema newSchema) { + Preconditions.checkNotNull(oldSchema); + Preconditions.checkNotNull(newSchema); + + Map<String, JsonNode> props = oldSchema.getJsonProps(); + copyProperties(props, newSchema); + } + + /*** + * Copy properties to an Avro Schema + * @param props Properties to copy to Avro Schema + * @param schema Avro Schema to copy properties to + */ + private static void copyProperties(Map<String, JsonNode> props, Schema schema) { + Preconditions.checkNotNull(schema); + + // (if null, don't copy but do not throw exception) + if (null != props) { + for (Map.Entry<String, JsonNode> prop : props.entrySet()) { + schema.addProp(prop.getKey(), prop.getValue()); + } + } + } + + /** * Serialize a generic record as a relative {@link Path}. Useful for converting {@link GenericRecord} type keys * into file system locations. For example {field1=v1, field2=v2} returns field1=v1/field2=v2 if includeFieldNames * is true, or v1/v2 if it is false. Illegal HDFS tokens such as ':' and '\\' will be replaced with '_'. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java index cadc507..e89b8b9 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; + import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -37,6 +39,8 @@ import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; +import com.google.common.collect.Maps; + public class AvroUtilsTest { private static final String AVRO_DIR = "gobblin-utility/src/test/resources/avroDirParent/"; @@ -196,6 +200,25 @@ public class AvroUtilsTest { } + @Test + public void testSwitchNamespace() { + String originalNamespace = "originalNamespace"; + String originalName = "originalName"; + String newNamespace = "newNamespace"; + Schema schema = SchemaBuilder.builder(originalNamespace).record(originalName).fields(). + requiredDouble("double").optionalFloat("float").endRecord(); + + Map<String, String> map = Maps.newHashMap(); + map.put(originalNamespace, newNamespace); + Schema newSchema = AvroUtils.switchNamespace(schema, map); + + Assert.assertEquals(newSchema.getNamespace(), newNamespace); + Assert.assertEquals(newSchema.getName(), originalName); + for(Schema.Field field : newSchema.getFields()) { + Assert.assertEquals(field, schema.getField(field.name())); + } + } + @Test public void testSerializeAsPath() throws Exception { Schema schema =
