Repository: incubator-gobblin Updated Branches: refs/heads/master bbf2c6ab6 -> a5281123a
[GOBBLIN-376] Added BytesToRecordWithMetadataConverter and RecordWithMetadataSchemaRegistrationConverter Closes #2250 from aditya1105/metadata Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a5281123 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a5281123 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a5281123 Branch: refs/heads/master Commit: a5281123a5912ae4990635f7b03f7a84068157e5 Parents: bbf2c6a Author: aditya1105 <[email protected]> Authored: Mon Jan 22 08:37:47 2018 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Jan 22 08:37:47 2018 -0800 ---------------------------------------------------------------------- gobblin-modules/gobblin-metadata/build.gradle | 1 + .../BytesToRecordWithMetadataConverter.java | 42 +++++++++++ ...WithMetadataSchemaRegistrationConverter.java | 74 ++++++++++++++++++++ 3 files changed, 117 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a5281123/gobblin-modules/gobblin-metadata/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-metadata/build.gradle b/gobblin-modules/gobblin-metadata/build.gradle index edb09c4..e982f93 100644 --- a/gobblin-modules/gobblin-metadata/build.gradle +++ b/gobblin-modules/gobblin-metadata/build.gradle @@ -24,6 +24,7 @@ dependencies { */ compile project(":gobblin-api") compile project(":gobblin-modules:gobblin-codecs") + compile project(":gobblin-modules:gobblin-kafka-common") compile project(":gobblin-utility") compile externalDependency.gson http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a5281123/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/BytesToRecordWithMetadataConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/BytesToRecordWithMetadataConverter.java b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/BytesToRecordWithMetadataConverter.java new file mode 100644 index 0000000..840462d --- /dev/null +++ b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/BytesToRecordWithMetadataConverter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter; + +import java.util.Collections; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.types.Metadata; +import org.apache.gobblin.type.RecordWithMetadata; + + +/** + * A converter that takes an array of bytes and convert it to {@link RecordWithMetadata} + * where record will be array of bytes and Metadata will be empty initialization + */ +public class BytesToRecordWithMetadataConverter extends Converter<Object, Object, byte[], RecordWithMetadata<?>> { + @Override + public Object convertSchema(Object inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + return inputSchema; + } + + @Override + public Iterable<RecordWithMetadata<?>> convertRecord(Object outputSchema, byte[] inputRecord, WorkUnitState workUnit) + throws DataConversionException { + return Collections.singleton(new RecordWithMetadata<>(inputRecord, new Metadata())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a5281123/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/RecordWithMetadataSchemaRegistrationConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/RecordWithMetadataSchemaRegistrationConverter.java b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/RecordWithMetadataSchemaRegistrationConverter.java new file mode 100644 index 0000000..7bf09f8 --- /dev/null +++ b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/RecordWithMetadataSchemaRegistrationConverter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.avro.Schema; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metadata.types.Metadata; +import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry; +import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory; +import org.apache.gobblin.metrics.kafka.SchemaRegistryException; +import org.apache.gobblin.type.RecordWithMetadata; + +import com.google.common.base.Preconditions; + + +/** + * A converter that takes {@link RecordWithMetadata}, + * tries to register the Avro Schema with KafkaSchemaRegistry + * and returns a {@link RecordWithMetadata} with schemaId inside Metadata + */ +public class RecordWithMetadataSchemaRegistrationConverter extends Converter<String, String, RecordWithMetadata<?>, RecordWithMetadata<?>> { + private static final String SCHEMA_ID_KEY = "Schema-Id"; + private static final String CONTENT_TYPE = "application/avro"; + private String schemaId; + + @Override + public String convertSchema(String inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + Schema schema = Schema.parse(inputSchema); + if (null == schemaId) { + try { + schemaId = getSchemaId(workUnit.getProperties(), schema); + } catch (SchemaRegistryException e) { + throw new SchemaConversionException(e); + } + } + return schema.toString(); + } + + private static String getSchemaId(Properties properties, Schema schema) + throws SchemaRegistryException { + KafkaAvroSchemaRegistry kafkaAvroSchemaRegistry = + (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().create(properties); + return kafkaAvroSchemaRegistry.register(schema); + } + + @Override + public Iterable<RecordWithMetadata<?>> convertRecord(String outputSchema, RecordWithMetadata<?> inputRecord, + WorkUnitState workUnit) + throws DataConversionException { + Preconditions.checkNotNull(schemaId); + Metadata metadata = inputRecord.getMetadata(); + metadata.getGlobalMetadata().setContentType(CONTENT_TYPE); + metadata.getRecordMetadata().put(SCHEMA_ID_KEY, schemaId); + return Collections.singleton(new RecordWithMetadata<>(inputRecord.getRecord(), metadata)); + } +}
