Repository: incubator-gobblin Updated Branches: refs/heads/master 2485282d2 -> ea5047ea2
[GOBBLIN-238] Implement EnvelopePayloadExtractor and EnvelopePayloadDeserializer Closes #2099 from zxcware/envelope2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ea5047ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ea5047ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ea5047ea Branch: refs/heads/master Commit: ea5047ea2665b7bee99352a7efe1cde625e88047 Parents: 2485282 Author: zhchen <[email protected]> Authored: Mon Sep 11 18:15:21 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Sep 11 18:15:21 2017 -0700 ---------------------------------------------------------------------- .../converter/BaseEnvelopeSchemaConverter.java | 139 +++++++++++++++++++ .../converter/EnvelopePayloadConverter.java | 97 +++++++++++++ .../EnvelopePayloadExtractingConverter.java | 48 +++++++ .../converter/EnvelopeSchemaConverter.java | 3 + .../converter/EnvelopePayloadConverterTest.java | 103 ++++++++++++++ .../EnvelopePayloadExtractingConverterTest.java | 103 ++++++++++++++ .../converter/EnvelopeSchemaConverterTest.java | 3 + .../KafkaAvroSchemaRegistryForTest.java | 3 + .../src/test/resources/converter/envelope.avro | Bin 0 -> 658 bytes .../src/test/resources/converter/envelope.avsc | 46 ++++++ .../src/test/resources/converter/record.avsc | 17 +++ 11 files changed, 562 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java new file mode 100644 index 0000000..d220902 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.nio.ByteBuffer; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry; +import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistryFactory; +import org.apache.gobblin.util.AvroUtils; + +import com.google.common.base.Optional; + + +/** + * Base class for an envelope schema converter using {@link KafkaSchemaRegistry} + */ +public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, Schema, GenericRecord, GenericRecord> { + public static final String PAYLOAD_SCHEMA_ID_FIELD = "converter.envelopeSchemaConverter.schemaIdField"; + public static final String PAYLOAD_FIELD = "converter.envelopeSchemaConverter.payloadField"; + public static final String PAYLOAD_SCHEMA_TOPIC = "converter.envelopeSchemaConverter.payloadSchemaTopic"; + public static final String KAFKA_REGISTRY_FACTORY = "converter.envelopeSchemaConverter.kafkaRegistryFactory"; + + public static final String DEFAULT_PAYLOAD_FIELD = "payload"; + public static final String DEFAULT_PAYLOAD_SCHEMA_ID_FIELD = "payloadSchemaId"; + public static final String DEFAULT_KAFKA_SCHEMA_REGISTRY_FACTORY_CLASS = + "org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory"; + + protected String payloadSchemaIdField; + protected String payloadField; + protected String payloadSchemaTopic; + protected GenericDatumReader<P> latestPayloadReader; + protected KafkaSchemaRegistry registry; + + @Override + public BaseEnvelopeSchemaConverter init(WorkUnitState workUnit) { + super.init(workUnit); + + payloadSchemaIdField = workUnit.getProp(PAYLOAD_SCHEMA_ID_FIELD, DEFAULT_PAYLOAD_SCHEMA_ID_FIELD); + payloadField = workUnit.getProp(PAYLOAD_FIELD, DEFAULT_PAYLOAD_FIELD); + + // Get the schema specific topic to fetch the schema in the registry + if (!workUnit.contains(PAYLOAD_SCHEMA_TOPIC)) { + throw new RuntimeException("Configuration not found: " + PAYLOAD_SCHEMA_TOPIC); + } + payloadSchemaTopic = workUnit.getProp(PAYLOAD_SCHEMA_TOPIC); + + String registryFactoryField = workUnit.getProp(KAFKA_REGISTRY_FACTORY, DEFAULT_KAFKA_SCHEMA_REGISTRY_FACTORY_CLASS); + try { + KafkaSchemaRegistryFactory registryFactory = + ((Class<? extends KafkaSchemaRegistryFactory>) Class.forName(registryFactoryField)).newInstance(); + registry = registryFactory.create(workUnit.getProperties()); + } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) { + throw new RuntimeException(e); + } + return this; + } + + /** + * Get the payload schema + * + * @param inputRecord the input record which has the payload + * @return the current schema of the payload + */ + protected Schema getPayloadSchema(GenericRecord inputRecord) + throws Exception { + Optional<Object> schemaIdValue = AvroUtils.getFieldValue(inputRecord, payloadSchemaIdField); + if (!schemaIdValue.isPresent()) { + throw new Exception("Schema id with key " + payloadSchemaIdField + " not found in the record"); + } + String schemaKey = String.valueOf(schemaIdValue.get()); + return (Schema) registry.getSchemaByKey(schemaKey); + } + + /** + * Get payload field and convert to byte array + * + * @param inputRecord the input record which has the payload + * @return the byte array of the payload in the input record + */ + protected byte[] getPayloadBytes(GenericRecord inputRecord) { + ByteBuffer bb = (ByteBuffer) inputRecord.get(payloadField); + if (bb.hasArray()) { + return bb.array(); + } else { + byte[] payloadBytes = new byte[bb.remaining()]; + bb.get(payloadBytes); + return payloadBytes; + } + } + + protected Schema fetchLatestPayloadSchema() throws Exception { + Schema latestPayloadSchema = (Schema)registry.getLatestSchemaByTopic(payloadSchemaTopic); + latestPayloadReader = new GenericDatumReader<>(latestPayloadSchema); + return latestPayloadSchema; + } + + /** + * Convert the payload in the input record to a deserialized object with the latest schema + * + * @param inputRecord the input record + * @return the schema'ed payload object + */ + protected P upConvertPayload(GenericRecord inputRecord) throws DataConversionException { + try { + Schema payloadSchema = getPayloadSchema(inputRecord); + // Set writer schema + latestPayloadReader.setSchema(payloadSchema); + + byte[] payloadBytes = getPayloadBytes(inputRecord); + Decoder decoder = DecoderFactory.get().binaryDecoder(payloadBytes, null); + + // 'latestPayloadReader.read' will convert the record from 'payloadSchema' to the latest payload schema + return latestPayloadReader.read(null, decoder); + } catch (Exception e) { + throw new DataConversionException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java new file mode 100644 index 0000000..ca63ac8 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.configuration.WorkUnitState; + + +/** + * A converter decorates the envelope record with its payload deserialized into schema'ed object + * + * <p> Given an envelope schema as the input schema, the output schema will have the payload + * field, configured by key {@value PAYLOAD_FIELD}, set with its latest schema fetched from a + * {@link #registry} (see {@code createDecoratedField(Field)}). The converter copies the other fields + * from the input schema to the output schema + * + * <p> Given an envelope record as the input record, the output record will have the payload set + * to its deserialized object using the latest schema (see {@code convertPayload(GenericRecord)}). + * The converter copies the other fields from the input record to the output record + * + * <p> If the current payload schema is incompatible with its latest schema, {@code convertPayload(GenericRecord)} + * will throw an exception and the job fail + */ + +public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<GenericRecord> { + public static final String DECORATED_PAYLOAD_DOC = "Decorated payload data"; + + @Override + public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + List<Field> outputSchemaFields = new ArrayList<>(); + for (Field field : inputSchema.getFields()) { + if (field.name().equals(payloadField)) { + // Decorate the field with full schema + outputSchemaFields.add(createDecoratedField(field)); + } else { + // Make a copy of the field to the output schema + outputSchemaFields.add(new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order())); + } + } + + Schema outputSchema = Schema + .createRecord(inputSchema.getName(), inputSchema.getDoc(), inputSchema.getNamespace(), inputSchema.isError()); + outputSchema.setFields(outputSchemaFields); + return outputSchema; + } + + /** + * Create a payload field with its latest schema fetched from {@link #registry} + * + * @param field the original payload field from input envelope schema + * @return a new payload field with its latest schema + */ + private Field createDecoratedField(Field field) throws SchemaConversionException { + try { + Schema payloadSchema = fetchLatestPayloadSchema(); + return new Field(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC, field.defaultValue(), field.order()); + } catch (Exception e) { + throw new SchemaConversionException(e); + } + } + + @Override + public Iterable<GenericRecord> convertRecord(Schema outputSchema, GenericRecord inputRecord, WorkUnitState workUnit) + throws DataConversionException { + GenericRecord outputRecord = new GenericData.Record(outputSchema); + for (Field field : inputRecord.getSchema().getFields()) { + if (field.name().equals(payloadField)) { + outputRecord.put(payloadField, upConvertPayload(inputRecord)); + } else { + outputRecord.put(field.name(), inputRecord.get(field.name())); + } + } + return new SingleRecordIterable<>(outputRecord); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverter.java new file mode 100644 index 0000000..b759dfd --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import org.apache.gobblin.configuration.WorkUnitState; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +/** + * A converter for extracting schema/records from an envelope schema. + * Input schema: envelope schema - must have fields payloadSchemaId (the schema registry key of the output + * schema) and payload (byte data for output record) + * Input record: record corresponding to input schema + * Output schema: latest schema obtained from schema registry with topic {@link #PAYLOAD_SCHEMA_TOPIC} + * Output record: record corresponding to output schema obtained from input record's {@link #PAYLOAD_FIELD} as bytes + */ +public class EnvelopePayloadExtractingConverter extends BaseEnvelopeSchemaConverter<GenericRecord> { + @Override + public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit) throws SchemaConversionException { + try { + return fetchLatestPayloadSchema(); + } catch (Exception e) { + throw new SchemaConversionException(e); + } + } + + @Override + public Iterable<GenericRecord> convertRecord(Schema outputSchema, GenericRecord inputRecord, WorkUnitState workUnit) + throws DataConversionException { + return new SingleRecordIterable<>(upConvertPayload(inputRecord)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java index 8696f13..124fe4c 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopeSchemaConverter.java @@ -47,7 +47,10 @@ import org.apache.avro.io.DecoderFactory; * Input record: record corresponding to input schema * Output schema: schema obtained from schema registry using key provided in input record's {@link #PAYLOAD_SCHEMA_ID_FIELD} * Output record: record corresponding to output schema obtained from input record's {@link #PAYLOAD_FIELD} as bytes + * + * @deprecated use {@link EnvelopePayloadExtractingConverter} */ +@Deprecated public class EnvelopeSchemaConverter extends Converter<Schema, String, GenericRecord, GenericRecord> { public static final String PAYLOAD_SCHEMA_ID_FIELD = "EnvelopeSchemaConverter.schemaIdField"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadConverterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadConverterTest.java new file mode 100644 index 0000000..561dfa0 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadConverterTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.FileUtils; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory; +import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry; +import org.apache.gobblin.metrics.kafka.SchemaRegistryException; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Iterables; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Unit test for {@link EnvelopePayloadConverter} + */ +public class EnvelopePayloadConverterTest { + private static final KafkaSchemaRegistry mockRegistry = mock(KafkaSchemaRegistry.class); + + @Test + public void testConverter() + throws IOException, DataConversionException, SchemaRegistryException { + Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/envelope.avsc")); + GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(inputSchema); + + File tmp = File.createTempFile(getClass().getSimpleName(), null); + FileUtils.copyInputStreamToFile(getClass().getResourceAsStream("/converter/envelope.avro"), tmp); + DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(tmp, datumReader); + GenericRecord inputRecord = dataFileReader.next(); + + Schema latestPayloadSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/record.avsc")); + when(mockRegistry.getLatestSchemaByTopic(any())).thenReturn(latestPayloadSchema); + when(mockRegistry.getSchemaByKey(any())).thenReturn(inputSchema.getField("nestedRecord").schema()); + + WorkUnitState workUnitState = new WorkUnitState(); + workUnitState.setProp(BaseEnvelopeSchemaConverter.PAYLOAD_SCHEMA_TOPIC, "test"); + workUnitState.setProp(BaseEnvelopeSchemaConverter.PAYLOAD_SCHEMA_ID_FIELD, "metadata.payloadSchemaId"); + workUnitState + .setProp(BaseEnvelopeSchemaConverter.KAFKA_REGISTRY_FACTORY, MockKafkaAvroSchemaRegistryFactory.class.getName()); + + EnvelopePayloadConverter converter = new EnvelopePayloadConverter(); + converter.init(workUnitState); + + Schema outputSchema = converter.convertSchema(inputSchema, workUnitState); + List<GenericRecord> outputRecords = new ArrayList<>(); + Iterables.addAll(outputRecords, converter.convertRecord(outputSchema, inputRecord, workUnitState)); + Assert.assertTrue(outputRecords.size() == 1); + + GenericRecord outputRecord = outputRecords.get(0); + GenericRecord payload = (GenericRecord) outputRecord.get("payload"); + // While making the test envelope avro record, its nestedRecord was intentionally set to the deserialized payload + GenericRecord expectedPayload = (GenericRecord) outputRecord.get("nestedRecord"); + + Schema payloadSchema = payload.getSchema(); + Schema expectedPayloadSchema = expectedPayload.getSchema(); + // The expected payload schema has the same number of fields as payload schema but in different order + Assert.assertTrue(expectedPayloadSchema.getName().equals(payloadSchema.getName())); + Assert.assertTrue(expectedPayloadSchema.getNamespace().equals(payloadSchema.getNamespace())); + Assert.assertTrue(expectedPayloadSchema.getFields().size() == payloadSchema.getFields().size()); + + for (Schema.Field field : payload.getSchema().getFields()) { + Assert.assertTrue(expectedPayload.get(field.name()).equals(payload.get(field.name()))); + } + } + + static class MockKafkaAvroSchemaRegistryFactory extends KafkaAvroSchemaRegistryFactory { + @Override + public KafkaSchemaRegistry create(Properties props) { + return mockRegistry; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverterTest.java new file mode 100644 index 0000000..1aa517e --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopePayloadExtractingConverterTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.converter; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.commons.io.FileUtils; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory; +import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Iterables; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Unit test for {@link EnvelopePayloadExtractingConverter}. + */ +@Test(groups = {"gobblin.converter"}) +public class EnvelopePayloadExtractingConverterTest { + private static final KafkaSchemaRegistry mockRegistry = mock(KafkaSchemaRegistry.class); + + @Test + public void testConverter() + throws Exception { + Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/envelope.avsc")); + GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(inputSchema); + + File tmp = File.createTempFile(getClass().getSimpleName(), null); + FileUtils.copyInputStreamToFile(getClass().getResourceAsStream("/converter/envelope.avro"), tmp); + DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(tmp, datumReader); + GenericRecord inputRecord = dataFileReader.next(); + + Schema latestPayloadSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/record.avsc")); + when(mockRegistry.getLatestSchemaByTopic(any())).thenReturn(latestPayloadSchema); + when(mockRegistry.getSchemaByKey(any())).thenReturn(inputSchema.getField("nestedRecord").schema()); + + WorkUnitState workUnitState = new WorkUnitState(); + workUnitState.setProp(BaseEnvelopeSchemaConverter.PAYLOAD_SCHEMA_TOPIC, "test"); + workUnitState.setProp(BaseEnvelopeSchemaConverter.PAYLOAD_SCHEMA_ID_FIELD, "metadata.payloadSchemaId"); + workUnitState.setProp(BaseEnvelopeSchemaConverter.KAFKA_REGISTRY_FACTORY, + EnvelopePayloadExtractingConverterTest.MockKafkaAvroSchemaRegistryFactory.class.getName()); + + EnvelopePayloadExtractingConverter converter = new EnvelopePayloadExtractingConverter(); + converter.init(workUnitState); + + Schema outputSchema = converter.convertSchema(inputSchema, workUnitState); + Assert.assertTrue(outputSchema.equals(latestPayloadSchema)); + + List<GenericRecord> outputRecords = new ArrayList<>(); + Iterables.addAll(outputRecords, converter.convertRecord(outputSchema, inputRecord, workUnitState)); + Assert.assertTrue(outputRecords.size() == 1); + + GenericRecord payload = outputRecords.get(0); + // While making the test envelope avro input record, its nestedRecord was intentionally set to the deserialized payload + GenericRecord expectedPayload = (GenericRecord) inputRecord.get("nestedRecord"); + + Schema payloadSchema = payload.getSchema(); + Schema expectedPayloadSchema = expectedPayload.getSchema(); + // The expected payload schema has the same number of fields as payload schema but in different order + Assert.assertTrue(expectedPayloadSchema.getName().equals(payloadSchema.getName())); + Assert.assertTrue(expectedPayloadSchema.getNamespace().equals(payloadSchema.getNamespace())); + Assert.assertTrue(expectedPayloadSchema.getFields().size() == payloadSchema.getFields().size()); + + for (Schema.Field field : payload.getSchema().getFields()) { + Assert.assertTrue(expectedPayload.get(field.name()).equals(payload.get(field.name()))); + } + } + + static class MockKafkaAvroSchemaRegistryFactory extends KafkaAvroSchemaRegistryFactory { + @Override + public KafkaSchemaRegistry create(Properties props) { + return mockRegistry; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java index a00e2c0..974e7b2 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/EnvelopeSchemaConverterTest.java @@ -31,8 +31,11 @@ import static org.mockito.Mockito.when; /** * Unit test for {@link EnvelopeSchemaConverter}. + * + * @deprecated As a result of deprecating {@link EnvelopeSchemaConverter} */ @Test(groups = {"gobblin.converter"}) +@Deprecated public class EnvelopeSchemaConverterTest { public static final String SCHEMA_KEY = "testKey"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java index ea960db..81ca09a 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaAvroSchemaRegistryForTest.java @@ -25,7 +25,10 @@ import org.apache.avro.Schema; /** * Override some methods of {@link KafkaAvroSchemaRegistry} for use in {@link EnvelopeSchemaConverterTest} + * + * @deprecated Checkout {@link EnvelopePayloadExtractingConverterTest} for how to mock a {@link KafkaSchemaRegistry} */ +@Deprecated public class KafkaAvroSchemaRegistryForTest extends KafkaAvroSchemaRegistry { public static class Factory implements KafkaSchemaRegistryFactory { public Factory() {} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avro ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avro b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avro new file mode 100644 index 0000000..75745b3 Binary files /dev/null and b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avro differ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avsc ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avsc b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avsc new file mode 100644 index 0000000..f579ae5 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/envelope.avsc @@ -0,0 +1,46 @@ +{ + "type": "record", + "name": "EnvelopeRecord", + "namespace": "org.apache.gobblin.test", + "fields": [ + { + "name": "metadata", + "type": { + "type": "map", + "values": "string" + }, + "doc": "record metadata." + }, + { + "name": "key", + "type": "bytes", + "doc": "serialized key." + }, + { + "name": "payload", + "type": "bytes", + "doc": "serialized payload data." + }, + { + "name": "nestedRecord", + "type": { + "type": "record", + "name": "SimpleRecord", + "namespace": "org.apache.gobblin.test", + "fields": [ + { + "name": "id", + "type": "string", + "doc": "ID of the record." + }, + { + "name": "created", + "type": "long", + "doc": "a time stamp." + } + ] + }, + "doc": "nested record" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ea5047ea/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/record.avsc ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/record.avsc b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/record.avsc new file mode 100644 index 0000000..eeb12dd --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/resources/converter/record.avsc @@ -0,0 +1,17 @@ +{ + "type": "record", + "name": "SimpleRecord", + "namespace": "org.apache.gobblin.test", + "fields": [ + { + "name": "created", + "type": "long", + "doc": "a time stamp." + }, + { + "name": "id", + "type": "string", + "doc": "ID of the record." + } + ] +} \ No newline at end of file
