Repository: incubator-gobblin Updated Branches: refs/heads/master dad2a8c2a -> 261fce33c
[GOBBLIN-198] Implement configuration to disable switching the Kafka topic's and Avro schema's names before registering schema [GOBBLIN-198] Implement configuration to disable switching the Kafka topic's and Avro schema's names before registering schema Fix FindBugs for LiKafkaSchemaRegistry and use getPropAsBoolean Closes #2056 from jenniferzheng/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/261fce33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/261fce33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/261fce33 Branch: refs/heads/master Commit: 261fce33c7b8193d2edf67403761a8a77f78b9e1 Parents: dad2a8c Author: Jennifer Zheng <[email protected]> Authored: Tue Aug 15 12:35:21 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Aug 15 12:35:21 2017 -0700 ---------------------------------------------------------------------- .../KafkaSchemaRegistryConfigurationKeys.java | 2 ++ .../kafka/schemareg/LiKafkaSchemaRegistry.java | 22 ++++++++++++++------ .../writer/KafkaWriterConfigurationKeys.java | 2 ++ .../gobblin/kafka/writer/KafkaWriterHelper.java | 1 + 4 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261fce33/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java index 9158663..9b5bdc6 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java @@ -24,5 +24,7 @@ public class KafkaSchemaRegistryConfigurationKeys { public final static String KAFKA_SCHEMA_REGISTRY_CLASS = "kafka.schemaRegistry.class"; public final static String KAFKA_SCHEMA_REGISTRY_URL = "kafka.schemaRegistry.url"; public final static String KAFKA_SCHEMA_REGISTRY_CACHE = "kafka.schemaRegistry.cache"; + public final static String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME = "kafka.schemaRegistry.switchName"; + public final static String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT = "true"; public final static String KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE = "kafka.schemaRegistry.overrideNamespace"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261fce33/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java index e6afbae..d046747 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java @@ -31,6 +31,7 @@ import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil; +import org.apache.gobblin.util.PropertiesUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch private final GenericObjectPool<HttpClient> httpClientPool; private final String url; private final Optional<Map<String, String>> namespaceOverride; + private final boolean switchTopicNames; /** * @param props properties should contain property "kafka.schema.registry.url", and optionally @@ -69,6 +71,8 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch this.url = props.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL); this.namespaceOverride = KafkaAvroReporterUtil.extractOverrideNamespace(props); + this.switchTopicNames = PropertiesUtils.getPropAsBoolean(props, KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_SWITCH_NAME, + KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT); int objPoolSize = Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, @@ -152,9 +156,10 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch /** * Register a schema to the Kafka schema registry under the provided input name. This method will change the name - * of the schema to the provided name. This is useful because certain services (like Gobblin kafka adaptor and + * of the schema to the provided name if configured to do so. This is useful because certain services (like Gobblin kafka adaptor and * Camus) get the schema for a topic by querying for the latest schema with the topic name, requiring the topic - * name and schema name to match for all topics. This method registers the schema to the schema registry in such a + * name and schema name to match for all topics. If it is not configured to switch names, this is useful for the case + * where the Kafka topic and Avro schema names do not match. This method registers the schema to the schema registry in such a * way that any schema can be written to any topic. * * @param schema {@link org.apache.avro.Schema} to register. @@ -165,17 +170,24 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch */ @Override public MD5Digest register(String name, Schema schema) throws SchemaRegistryException { - return register(AvroUtils.switchName(schema, name)); + PostMethod post = new PostMethod(url); + if (this.switchTopicNames) { + return register(AvroUtils.switchName(schema, name), post); + } else { + post.addParameter("name", name); + return register(schema, post); + } } /** * Register a schema to the Kafka schema registry * * @param schema + * @param post * @return schema ID of the registered schema * @throws SchemaRegistryException if registration failed */ - public synchronized MD5Digest register(Schema schema) throws SchemaRegistryException { + public synchronized MD5Digest register(Schema schema, PostMethod post) throws SchemaRegistryException { // Change namespace if override specified if (this.namespaceOverride.isPresent()) { @@ -184,7 +196,6 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch LOG.info("Registering schema " + schema.toString()); - PostMethod post = new PostMethod(url); post.addParameter("schema", schema.toString()); HttpClient httpClient = this.borrowClient(); @@ -258,5 +269,4 @@ public class LiKafkaSchemaRegistry implements KafkaSchemaRegistry<MD5Digest, Sch return schema; } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261fce33/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java index 0303341..f6776c0 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java @@ -47,5 +47,7 @@ public class KafkaWriterConfigurationKeys { static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer"; static final String CLIENT_ID_CONFIG = "client.id"; static final String CLIENT_ID_DEFAULT = "gobblin"; + static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME = "kafka.schemaRegistry.switchName"; + static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT = "true"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261fce33/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java index b717d43..3f52645 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java @@ -48,6 +48,7 @@ public class KafkaWriterHelper { setDefaultIfUnset(producerProperties, KEY_SERIALIZER_CONFIG, DEFAULT_KEY_SERIALIZER); setDefaultIfUnset(producerProperties, VALUE_SERIALIZER_CONFIG, DEFAULT_VALUE_SERIALIZER); setDefaultIfUnset(producerProperties, CLIENT_ID_CONFIG, CLIENT_ID_DEFAULT); + setDefaultIfUnset(producerProperties, KAFKA_SCHEMA_REGISTRY_SWITCH_NAME, KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT); return producerProperties; }
