Repository: incubator-gobblin
Updated Branches:
  refs/heads/master bbf2c6ab6 -> a5281123a


[GOBBLIN-376] Added BytesToRecordWithMetadataConverter and 
RecordWithMetadataSchemaRegistrationConverter

Closes #2250 from aditya1105/metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a5281123
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a5281123
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a5281123

Branch: refs/heads/master
Commit: a5281123a5912ae4990635f7b03f7a84068157e5
Parents: bbf2c6a
Author: aditya1105 <[email protected]>
Authored: Mon Jan 22 08:37:47 2018 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Mon Jan 22 08:37:47 2018 -0800

----------------------------------------------------------------------
 gobblin-modules/gobblin-metadata/build.gradle   |  1 +
 .../BytesToRecordWithMetadataConverter.java     | 42 +++++++++++
 ...WithMetadataSchemaRegistrationConverter.java | 74 ++++++++++++++++++++
 3 files changed, 117 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a5281123/gobblin-modules/gobblin-metadata/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-metadata/build.gradle 
b/gobblin-modules/gobblin-metadata/build.gradle
index edb09c4..e982f93 100644
--- a/gobblin-modules/gobblin-metadata/build.gradle
+++ b/gobblin-modules/gobblin-metadata/build.gradle
@@ -24,6 +24,7 @@ dependencies {
    */
   compile project(":gobblin-api")
   compile project(":gobblin-modules:gobblin-codecs")
+  compile project(":gobblin-modules:gobblin-kafka-common")
   compile project(":gobblin-utility")
 
   compile externalDependency.gson

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a5281123/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/BytesToRecordWithMetadataConverter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/BytesToRecordWithMetadataConverter.java
 
b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/BytesToRecordWithMetadataConverter.java
new file mode 100644
index 0000000..840462d
--- /dev/null
+++ 
b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/BytesToRecordWithMetadataConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import java.util.Collections;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.types.Metadata;
+import org.apache.gobblin.type.RecordWithMetadata;
+
+
+/**
+ * A converter that takes an array of bytes and convert it to {@link 
RecordWithMetadata}
+ * where record will be array of bytes and Metadata will be empty 
initialization
+ */
+public class BytesToRecordWithMetadataConverter extends Converter<Object, 
Object, byte[], RecordWithMetadata<?>> {
+  @Override
+  public Object convertSchema(Object inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    return inputSchema;
+  }
+
+  @Override
+  public Iterable<RecordWithMetadata<?>> convertRecord(Object outputSchema, 
byte[] inputRecord, WorkUnitState workUnit)
+      throws DataConversionException {
+    return Collections.singleton(new RecordWithMetadata<>(inputRecord, new 
Metadata()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a5281123/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/RecordWithMetadataSchemaRegistrationConverter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/RecordWithMetadataSchemaRegistrationConverter.java
 
b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/RecordWithMetadataSchemaRegistrationConverter.java
new file mode 100644
index 0000000..7bf09f8
--- /dev/null
+++ 
b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/RecordWithMetadataSchemaRegistrationConverter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.types.Metadata;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.type.RecordWithMetadata;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * A converter that takes {@link RecordWithMetadata},
+ * tries to register the Avro Schema with KafkaSchemaRegistry
+ * and returns a {@link RecordWithMetadata} with schemaId inside Metadata
+ */
+public class RecordWithMetadataSchemaRegistrationConverter extends 
Converter<String, String, RecordWithMetadata<?>, RecordWithMetadata<?>> {
+  private static final String SCHEMA_ID_KEY = "Schema-Id";
+  private static final String CONTENT_TYPE = "application/avro";
+  private String schemaId;
+
+  @Override
+  public String convertSchema(String inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    Schema schema = Schema.parse(inputSchema);
+    if (null == schemaId) {
+      try {
+        schemaId = getSchemaId(workUnit.getProperties(), schema);
+      } catch (SchemaRegistryException e) {
+        throw new SchemaConversionException(e);
+      }
+    }
+    return schema.toString();
+  }
+
+  private static String getSchemaId(Properties properties, Schema schema)
+      throws SchemaRegistryException {
+    KafkaAvroSchemaRegistry kafkaAvroSchemaRegistry =
+        (KafkaAvroSchemaRegistry) new 
KafkaAvroSchemaRegistryFactory().create(properties);
+    return kafkaAvroSchemaRegistry.register(schema);
+  }
+
+  @Override
+  public Iterable<RecordWithMetadata<?>> convertRecord(String outputSchema, 
RecordWithMetadata<?> inputRecord,
+      WorkUnitState workUnit)
+      throws DataConversionException {
+    Preconditions.checkNotNull(schemaId);
+    Metadata metadata = inputRecord.getMetadata();
+    metadata.getGlobalMetadata().setContentType(CONTENT_TYPE);
+    metadata.getRecordMetadata().put(SCHEMA_ID_KEY, schemaId);
+    return Collections.singleton(new 
RecordWithMetadata<>(inputRecord.getRecord(), metadata));
+  }
+}

Reply via email to