Repository: incubator-gobblin
Updated Branches:
  refs/heads/master dad2a8c2a -> 261fce33c


[GOBBLIN-198] Implement configuration to disable switching the Kafka topic's 
and Avro schema's names before registering schema

[GOBBLIN-198] Implement configuration to disable
switching the Kafka topic's and Avro schema's
names before registering schema

Fix FindBugs for LiKafkaSchemaRegistry and use
getPropAsBoolean

Closes #2056 from jenniferzheng/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/261fce33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/261fce33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/261fce33

Branch: refs/heads/master
Commit: 261fce33c7b8193d2edf67403761a8a77f78b9e1
Parents: dad2a8c
Author: Jennifer Zheng <[email protected]>
Authored: Tue Aug 15 12:35:21 2017 -0700
Committer: Abhishek Tiwari <[email protected]>
Committed: Tue Aug 15 12:35:21 2017 -0700

----------------------------------------------------------------------
 .../KafkaSchemaRegistryConfigurationKeys.java   |  2 ++
 .../kafka/schemareg/LiKafkaSchemaRegistry.java  | 22 ++++++++++++++------
 .../writer/KafkaWriterConfigurationKeys.java    |  2 ++
 .../gobblin/kafka/writer/KafkaWriterHelper.java |  1 +
 4 files changed, 21 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261fce33/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
index 9158663..9b5bdc6 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
@@ -24,5 +24,7 @@ public class KafkaSchemaRegistryConfigurationKeys {
   public final static String KAFKA_SCHEMA_REGISTRY_CLASS = 
"kafka.schemaRegistry.class";
   public final static String KAFKA_SCHEMA_REGISTRY_URL = 
"kafka.schemaRegistry.url";
   public final static String KAFKA_SCHEMA_REGISTRY_CACHE = 
"kafka.schemaRegistry.cache";
+  public final static String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME = 
"kafka.schemaRegistry.switchName";
+  public final static String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT = 
"true";
   public final static String KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE = 
"kafka.schemaRegistry.overrideNamespace";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261fce33/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
index e6afbae..d046747 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
@@ -31,6 +31,7 @@ import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +58,7 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
   private final GenericObjectPool<HttpClient> httpClientPool;
   private final String url;
   private final Optional<Map<String, String>> namespaceOverride;
+  private final boolean switchTopicNames;
 
   /**
    * @param props properties should contain property 
"kafka.schema.registry.url", and optionally
@@ -69,6 +71,8 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
 
     this.url = 
props.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL);
     this.namespaceOverride = 
KafkaAvroReporterUtil.extractOverrideNamespace(props);
+    this.switchTopicNames = PropertiesUtils.getPropAsBoolean(props, 
KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_SWITCH_NAME,
+        
KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT);
 
     int objPoolSize =
         
Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
@@ -152,9 +156,10 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
 
   /**
    * Register a schema to the Kafka schema registry under the provided input 
name. This method will change the name
-   * of the schema to the provided name. This is useful because certain 
services (like Gobblin kafka adaptor and
+   * of the schema to the provided name if configured to do so. This is useful 
because certain services (like Gobblin kafka adaptor and
    * Camus) get the schema for a topic by querying for the latest schema with 
the topic name, requiring the topic
-   * name and schema name to match for all topics. This method registers the 
schema to the schema registry in such a
+   * name and schema name to match for all topics. If it is not configured to 
switch names, this is useful for the case
+   * where the Kafka topic and Avro schema names do not match. This method 
registers the schema to the schema registry in such a
    * way that any schema can be written to any topic.
    *
    * @param schema {@link org.apache.avro.Schema} to register.
@@ -165,17 +170,24 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
    */
   @Override
   public MD5Digest register(String name, Schema schema) throws 
SchemaRegistryException {
-    return register(AvroUtils.switchName(schema, name));
+    PostMethod post = new PostMethod(url);
+    if (this.switchTopicNames) {
+      return register(AvroUtils.switchName(schema, name), post);
+    } else {
+      post.addParameter("name", name);
+      return register(schema, post);
+    }
   }
 
   /**
    * Register a schema to the Kafka schema registry
    *
    * @param schema
+   * @param post
    * @return schema ID of the registered schema
    * @throws SchemaRegistryException if registration failed
    */
-  public synchronized MD5Digest register(Schema schema) throws 
SchemaRegistryException {
+  public synchronized MD5Digest register(Schema schema, PostMethod post) 
throws SchemaRegistryException {
 
     // Change namespace if override specified
     if (this.namespaceOverride.isPresent()) {
@@ -184,7 +196,6 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
 
     LOG.info("Registering schema " + schema.toString());
 
-    PostMethod post = new PostMethod(url);
     post.addParameter("schema", schema.toString());
 
     HttpClient httpClient = this.borrowClient();
@@ -258,5 +269,4 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
 
     return schema;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261fce33/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
index 0303341..f6776c0 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterConfigurationKeys.java
@@ -47,5 +47,7 @@ public class KafkaWriterConfigurationKeys {
   static final String DEFAULT_VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.ByteArraySerializer";
   static final String CLIENT_ID_CONFIG = "client.id";
   static final String CLIENT_ID_DEFAULT = "gobblin";
+  static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME = 
"kafka.schemaRegistry.switchName";
+  static final String KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT = "true";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261fce33/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
index b717d43..3f52645 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
@@ -48,6 +48,7 @@ public class KafkaWriterHelper {
     setDefaultIfUnset(producerProperties, KEY_SERIALIZER_CONFIG, 
DEFAULT_KEY_SERIALIZER);
     setDefaultIfUnset(producerProperties, VALUE_SERIALIZER_CONFIG, 
DEFAULT_VALUE_SERIALIZER);
     setDefaultIfUnset(producerProperties, CLIENT_ID_CONFIG, CLIENT_ID_DEFAULT);
+    setDefaultIfUnset(producerProperties, KAFKA_SCHEMA_REGISTRY_SWITCH_NAME, 
KAFKA_SCHEMA_REGISTRY_SWITCH_NAME_DEFAULT);
     return producerProperties;
   }
 

Reply via email to