Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 42677dc8c -> ce60d2c7c


[GOBBLIN-195] Ability to switch Avro schema namespace switch before registering 
with Kafka Avro Schema registry

Closes #2049 from
abti/avro_schema_namespace_switch


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ce60d2c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ce60d2c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ce60d2c7

Branch: refs/heads/master
Commit: ce60d2c7c4e3df1ac85b49993d4a3ccfafc40d2c
Parents: 42677dc
Author: Abhishek Tiwari <abhishektiwari.bt...@gmail.com>
Authored: Wed Aug 9 13:03:09 2017 -0700
Committer: Hung Tran <hut...@linkedin.com>
Committed: Wed Aug 9 13:03:09 2017 -0700

----------------------------------------------------------------------
 .../KafkaSchemaRegistryConfigurationKeys.java   |   1 +
 .../kafka/schemareg/LiKafkaSchemaRegistry.java  |  11 ++
 .../metrics/kafka/KafkaAvroSchemaRegistry.java  |  11 ++
 .../reporter/util/KafkaAvroReporterUtil.java    |  75 +++++++++++++
 .../java/org/apache/gobblin/util/AvroUtils.java | 109 +++++++++++++++++++
 .../org/apache/gobblin/util/AvroUtilsTest.java  |  23 ++++
 6 files changed, 230 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
index d6f023b..9158663 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/KafkaSchemaRegistryConfigurationKeys.java
@@ -24,4 +24,5 @@ public class KafkaSchemaRegistryConfigurationKeys {
   public final static String KAFKA_SCHEMA_REGISTRY_CLASS = 
"kafka.schemaRegistry.class";
   public final static String KAFKA_SCHEMA_REGISTRY_URL = 
"kafka.schemaRegistry.url";
   public final static String KAFKA_SCHEMA_REGISTRY_CACHE = 
"kafka.schemaRegistry.cache";
+  public final static String KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE = 
"kafka.schemaRegistry.overrideNamespace";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
index 2ea7075..e6afbae 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/schemareg/LiKafkaSchemaRegistry.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.kafka.schemareg;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.avro.Schema;
@@ -29,9 +30,11 @@ import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -53,6 +56,7 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
 
   private final GenericObjectPool<HttpClient> httpClientPool;
   private final String url;
+  private final Optional<Map<String, String>> namespaceOverride;
 
   /**
    * @param props properties should contain property 
"kafka.schema.registry.url", and optionally
@@ -64,6 +68,7 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
         String.format("Property %s not provided.", 
KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL));
 
     this.url = 
props.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_URL);
+    this.namespaceOverride = 
KafkaAvroReporterUtil.extractOverrideNamespace(props);
 
     int objPoolSize =
         
Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
@@ -171,6 +176,12 @@ public class LiKafkaSchemaRegistry implements 
KafkaSchemaRegistry<MD5Digest, Sch
    * @throws SchemaRegistryException if registration failed
    */
   public synchronized MD5Digest register(Schema schema) throws 
SchemaRegistryException {
+
+    // Change namespace if override specified
+    if (this.namespaceOverride.isPresent()) {
+      schema = AvroUtils.switchNamespace(schema, this.namespaceOverride.get());
+    }
+
     LOG.info("Registering schema " + schema.toString());
 
     PostMethod post = new PostMethod(url);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
index d391ef3..4c155fb 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaAvroSchemaRegistry.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.metrics.kafka;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
@@ -30,9 +31,11 @@ import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -59,6 +62,7 @@ public class KafkaAvroSchemaRegistry extends 
KafkaSchemaRegistry<String, Schema>
 
   private final GenericObjectPool<HttpClient> httpClientPool;
   private final String url;
+  private final Optional<Map<String, String>> namespaceOverride;
 
   /**
    * @param properties properties should contain property 
"kafka.schema.registry.url", and optionally
@@ -71,6 +75,7 @@ public class KafkaAvroSchemaRegistry extends 
KafkaSchemaRegistry<String, Schema>
         String.format("Property %s not provided.", KAFKA_SCHEMA_REGISTRY_URL));
 
     this.url = props.getProperty(KAFKA_SCHEMA_REGISTRY_URL);
+    this.namespaceOverride = 
KafkaAvroReporterUtil.extractOverrideNamespace(props);
 
     int objPoolSize =
         
Integer.parseInt(props.getProperty(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
@@ -186,6 +191,12 @@ public class KafkaAvroSchemaRegistry extends 
KafkaSchemaRegistry<String, Schema>
    */
   @Override
   public synchronized String register(Schema schema) throws 
SchemaRegistryException {
+
+    // Change namespace if override specified
+    if (this.namespaceOverride.isPresent()) {
+      schema = AvroUtils.switchNamespace(schema, this.namespaceOverride.get());
+    }
+
     LOG.info("Registering schema " + schema.toString());
 
     PostMethod post = new PostMethod(url);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
new file mode 100644
index 0000000..1d82921
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/reporter/util/KafkaAvroReporterUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.reporter.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+
+public class KafkaAvroReporterUtil {
+
+  private static final Splitter SPLIT_BY_COMMA = 
Splitter.on(",").omitEmptyStrings().trimResults();
+  private static final Splitter SPLIT_BY_COLON = 
Splitter.on(":").omitEmptyStrings().trimResults();
+
+  /***
+   * This method extracts Map of namespaces to override in Kafka schema from 
Config.
+   *
+   * Example config:
+   * kafka.schemaRegistry.overrideNamespace = 
namespace1:replacement1,namespace2:replacement2
+   *
+   * For the above example, this method will create a Map with values:
+   * {
+   *   "namespace1" : "replacement1",
+   *   "namespace2" : "replacement2"
+   * }
+   *
+   * @param properties Properties properties.
+   * @return Map of namespace overrides.
+   */
+  public static Optional<Map<String, String>> 
extractOverrideNamespace(Properties properties) {
+    if 
(properties.containsKey(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE))
 {
+
+      Map<String, String> namespaceOverridesMap = Maps.newHashMap();
+      List<String> namespaceOverrides = 
Lists.newArrayList(SPLIT_BY_COMMA.split(properties
+          
.getProperty(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE)));
+
+      for (String namespaceOverride : namespaceOverrides) {
+        List<String> override = 
Lists.newArrayList(SPLIT_BY_COLON.split(namespaceOverride));
+        if (override.size() != 2) {
+          throw new RuntimeException("Namespace override should be of the 
format originalNamespace:replacementNamespace,"
+              + " found: " + namespaceOverride);
+        }
+        namespaceOverridesMap.put(override.get(0), override.get(1));
+      }
+
+      // If no entry found in the config value, mark it absent
+      if (namespaceOverridesMap.size() != 0) {
+        return Optional.of(namespaceOverridesMap);
+      }
+    }
+
+    return Optional.<Map<String, String>>absent();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 694a40b..d09a6d9 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -49,6 +49,7 @@ import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -56,6 +57,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.codehaus.jackson.JsonNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -612,6 +614,113 @@ public class AvroUtils {
   }
 
   /**
+   * Copies the input {@link org.apache.avro.Schema} but changes the schema 
namespace.
+   * @param schema {@link org.apache.avro.Schema} to copy.
+   * @param namespaceOverride namespace for the copied {@link 
org.apache.avro.Schema}.
+   * @return A {@link org.apache.avro.Schema} that is a copy of schema, but 
has the new namespace.
+   */
+  public static Schema switchNamespace(Schema schema, Map<String, String> 
namespaceOverride) {
+    Schema newSchema;
+    String newNamespace = StringUtils.EMPTY;
+
+    // Process all Schema Types
+    // (Primitives are simply cloned)
+    switch (schema.getType()) {
+      case ENUM:
+        newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? 
namespaceOverride.get(schema.getNamespace())
+            : schema.getNamespace();
+        newSchema =
+            Schema.createEnum(schema.getName(), schema.getDoc(), newNamespace, 
schema.getEnumSymbols());
+        break;
+      case FIXED:
+        newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? 
namespaceOverride.get(schema.getNamespace())
+            : schema.getNamespace();
+        newSchema =
+            Schema.createFixed(schema.getName(), schema.getDoc(), 
newNamespace, schema.getFixedSize());
+        break;
+      case MAP:
+        newSchema = Schema.createMap(switchNamespace(schema.getValueType(), 
namespaceOverride));
+        break;
+      case RECORD:
+        newNamespace = namespaceOverride.containsKey(schema.getNamespace()) ? 
namespaceOverride.get(schema.getNamespace())
+            : schema.getNamespace();
+        List<Schema.Field> newFields = new ArrayList<>();
+        if (schema.getFields().size() > 0) {
+          for (Schema.Field oldField : schema.getFields()) {
+            Field newField = new Field(oldField.name(), 
switchNamespace(oldField.schema(), namespaceOverride), oldField.doc(),
+                oldField.defaultValue(), oldField.order());
+            newFields.add(newField);
+          }
+        }
+        newSchema = Schema.createRecord(schema.getName(), schema.getDoc(), 
newNamespace,
+            schema.isError());
+        newSchema.setFields(newFields);
+        break;
+      case UNION:
+        List<Schema> newUnionMembers = new ArrayList<>();
+        if (null != schema.getTypes() && schema.getTypes().size() > 0) {
+          for (Schema oldUnionMember : schema.getTypes()) {
+            newUnionMembers.add(switchNamespace(oldUnionMember, 
namespaceOverride));
+          }
+        }
+        newSchema = Schema.createUnion(newUnionMembers);
+        break;
+      case ARRAY:
+        newSchema = 
Schema.createArray(switchNamespace(schema.getElementType(), namespaceOverride));
+        break;
+      case BOOLEAN:
+      case BYTES:
+      case DOUBLE:
+      case FLOAT:
+      case INT:
+      case LONG:
+      case NULL:
+      case STRING:
+        newSchema = Schema.create(schema.getType());
+        break;
+      default:
+        String exceptionMessage = String.format("Schema namespace replacement 
failed for \"%s\" ", schema);
+        LOG.error(exceptionMessage);
+
+        throw new AvroRuntimeException(exceptionMessage);
+    }
+
+    // Copy schema metadata
+    copyProperties(schema, newSchema);
+
+    return newSchema;
+  }
+
+  /***
+   * Copy properties from old Avro Schema to new Avro Schema
+   * @param oldSchema Old Avro Schema to copy properties from
+   * @param newSchema New Avro Schema to copy properties to
+   */
+  private static void copyProperties(Schema oldSchema, Schema newSchema) {
+    Preconditions.checkNotNull(oldSchema);
+    Preconditions.checkNotNull(newSchema);
+
+    Map<String, JsonNode> props = oldSchema.getJsonProps();
+    copyProperties(props, newSchema);
+  }
+
+  /***
+   * Copy properties to an Avro Schema
+   * @param props Properties to copy to Avro Schema
+   * @param schema Avro Schema to copy properties to
+   */
+  private static void copyProperties(Map<String, JsonNode> props, Schema 
schema) {
+    Preconditions.checkNotNull(schema);
+
+    // (if null, don't copy but do not throw exception)
+    if (null != props) {
+      for (Map.Entry<String, JsonNode> prop : props.entrySet()) {
+        schema.addProp(prop.getKey(), prop.getValue());
+      }
+    }
+  }
+
+  /**
    * Serialize a generic record as a relative {@link Path}. Useful for 
converting {@link GenericRecord} type keys
    * into file system locations. For example {field1=v1, field2=v2} returns 
field1=v1/field2=v2 if includeFieldNames
    * is true, or v1/v2 if it is false. Illegal HDFS tokens such as ':' and 
'\\' will be replaced with '_'.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ce60d2c7/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index cadc507..e89b8b9 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -37,6 +39,8 @@ import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Maps;
+
 
 public class AvroUtilsTest {
   private static final String AVRO_DIR = 
"gobblin-utility/src/test/resources/avroDirParent/";
@@ -196,6 +200,25 @@ public class AvroUtilsTest {
 
   }
 
+  @Test
+  public void testSwitchNamespace() {
+    String originalNamespace = "originalNamespace";
+    String originalName = "originalName";
+    String newNamespace = "newNamespace";
+    Schema schema = 
SchemaBuilder.builder(originalNamespace).record(originalName).fields().
+        requiredDouble("double").optionalFloat("float").endRecord();
+
+    Map<String, String> map = Maps.newHashMap();
+    map.put(originalNamespace, newNamespace);
+    Schema newSchema = AvroUtils.switchNamespace(schema, map);
+
+    Assert.assertEquals(newSchema.getNamespace(), newNamespace);
+    Assert.assertEquals(newSchema.getName(), originalName);
+    for(Schema.Field field : newSchema.getFields()) {
+      Assert.assertEquals(field, schema.getField(field.name()));
+    }
+  }
+
   @Test public void testSerializeAsPath() throws Exception {
 
     Schema schema =

Reply via email to