Repository: incubator-gobblin Updated Branches: refs/heads/master a9c9f781f -> becb2b786
[GOBBLIN-283] Refactor EnvelopePayloadConverter to support multi fields conversion Closes #2136 from zxcware/envref Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/becb2b78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/becb2b78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/becb2b78 Branch: refs/heads/master Commit: becb2b78600cabba01ce2fb8d3d039dc283c458e Parents: a9c9f78 Author: zhchen <[email protected]> Authored: Wed Oct 11 09:57:07 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Oct 11 09:57:07 2017 -0700 ---------------------------------------------------------------------- .../converter/BaseEnvelopeSchemaConverter.java | 50 +++++++++++++++++--- .../converter/EnvelopePayloadConverter.java | 44 +++++++++++------ 2 files changed, 73 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/becb2b78/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java index d220902..be59c2a 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java @@ -36,10 +36,12 @@ import com.google.common.base.Optional; * Base class for an envelope schema converter using {@link KafkaSchemaRegistry} */ public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, Schema, GenericRecord, GenericRecord> { - public static final String PAYLOAD_SCHEMA_ID_FIELD = "converter.envelopeSchemaConverter.schemaIdField"; - public static final String PAYLOAD_FIELD = "converter.envelopeSchemaConverter.payloadField"; - public static final String PAYLOAD_SCHEMA_TOPIC = "converter.envelopeSchemaConverter.payloadSchemaTopic"; - public static final String KAFKA_REGISTRY_FACTORY = "converter.envelopeSchemaConverter.kafkaRegistryFactory"; + public static final String CONF_PREFIX = "converter.envelopeSchemaConverter."; + + public static final String PAYLOAD_SCHEMA_ID_FIELD = CONF_PREFIX + "schemaIdField"; + public static final String PAYLOAD_FIELD = CONF_PREFIX + "payloadField"; + public static final String PAYLOAD_SCHEMA_TOPIC = CONF_PREFIX + "payloadSchemaTopic"; + public static final String KAFKA_REGISTRY_FACTORY = CONF_PREFIX + "kafkaRegistryFactory"; public static final String DEFAULT_PAYLOAD_FIELD = "payload"; public static final String DEFAULT_PAYLOAD_SCHEMA_ID_FIELD = "payloadSchemaId"; @@ -81,12 +83,25 @@ public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, S * * @param inputRecord the input record which has the payload * @return the current schema of the payload + * @deprecated use {@link #getFieldSchema(GenericRecord, String)} */ + @Deprecated protected Schema getPayloadSchema(GenericRecord inputRecord) throws Exception { - Optional<Object> schemaIdValue = AvroUtils.getFieldValue(inputRecord, payloadSchemaIdField); + return getFieldSchema(inputRecord, payloadSchemaIdField); + } + + /** + * Get the schema of a field + * + * @param record the input record which has the schema id + * @param schemaIdLocation a dot separated location string the schema id + * @return a schema referenced by the schema id + */ + protected Schema getFieldSchema(GenericRecord record, String schemaIdLocation) throws Exception { + Optional<Object> schemaIdValue = AvroUtils.getFieldValue(record, schemaIdLocation); if (!schemaIdValue.isPresent()) { - throw new Exception("Schema id with key " + payloadSchemaIdField + " not found in the record"); + throw new Exception("Schema id with key " + schemaIdLocation + " not found in the record"); } String schemaKey = String.valueOf(schemaIdValue.get()); return (Schema) registry.getSchemaByKey(schemaKey); @@ -97,9 +112,30 @@ public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, S * * @param inputRecord the input record which has the payload * @return the byte array of the payload in the input record + * @deprecated use {@link #getFieldAsBytes(GenericRecord, String)} */ + @Deprecated protected byte[] getPayloadBytes(GenericRecord inputRecord) { - ByteBuffer bb = (ByteBuffer) inputRecord.get(payloadField); + try { + return getFieldAsBytes(inputRecord, payloadField); + } catch (Exception e) { + return null; + } + } + + /** + * Get field value byte array + * + * @param record the input record which has the field + * @param fieldLocation a dot separated location string to the field + * @return the byte array of field value + */ + protected byte[] getFieldAsBytes(GenericRecord record, String fieldLocation) throws Exception { + Optional<Object> bytesValue = AvroUtils.getFieldValue(record, fieldLocation); + if (!bytesValue.isPresent()) { + throw new Exception("Bytes value with key " + fieldLocation + " not found in the record"); + } + ByteBuffer bb = (ByteBuffer) bytesValue.get(); if (bb.hasArray()) { return bb.array(); } else { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/becb2b78/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java index ca63ac8..6408a4c 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java @@ -32,7 +32,7 @@ import org.apache.gobblin.configuration.WorkUnitState; * * <p> Given an envelope schema as the input schema, the output schema will have the payload * field, configured by key {@value PAYLOAD_FIELD}, set with its latest schema fetched from a - * {@link #registry} (see {@code createDecoratedField(Field)}). The converter copies the other fields + * {@link #registry} (see {@code createLatestPayloadField(Field)}). The converter copies the other fields * from the input schema to the output schema * * <p> Given an envelope record as the input record, the output record will have the payload set @@ -51,13 +51,7 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi throws SchemaConversionException { List<Field> outputSchemaFields = new ArrayList<>(); for (Field field : inputSchema.getFields()) { - if (field.name().equals(payloadField)) { - // Decorate the field with full schema - outputSchemaFields.add(createDecoratedField(field)); - } else { - // Make a copy of the field to the output schema - outputSchemaFields.add(new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order())); - } + outputSchemaFields.add(convertFieldSchema(inputSchema, field, workUnit)); } Schema outputSchema = Schema @@ -67,12 +61,26 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi } /** + * Convert to the output schema of a field + */ + protected Field convertFieldSchema(Schema inputSchema, Field field, WorkUnitState workUnit) + throws SchemaConversionException { + if (field.name().equals(payloadField)) { + // Create a payload field with latest schema + return createLatestPayloadField(field); + } + // Make a copy of the field to the output schema + return new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order()); + } + + /** * Create a payload field with its latest schema fetched from {@link #registry} * * @param field the original payload field from input envelope schema * @return a new payload field with its latest schema */ - private Field createDecoratedField(Field field) throws SchemaConversionException { + private Field createLatestPayloadField(Field field) + throws SchemaConversionException { try { Schema payloadSchema = fetchLatestPayloadSchema(); return new Field(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC, field.defaultValue(), field.order()); @@ -86,12 +94,20 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi throws DataConversionException { GenericRecord outputRecord = new GenericData.Record(outputSchema); for (Field field : inputRecord.getSchema().getFields()) { - if (field.name().equals(payloadField)) { - outputRecord.put(payloadField, upConvertPayload(inputRecord)); - } else { - outputRecord.put(field.name(), inputRecord.get(field.name())); - } + outputRecord.put(field.name(), convertFieldValue(outputSchema, field, inputRecord, workUnit)); } return new SingleRecordIterable<>(outputRecord); } + + /** + * Convert to the output value of a field + */ + protected Object convertFieldValue(Schema outputSchema, Field field, GenericRecord inputRecord, + WorkUnitState workUnit) + throws DataConversionException { + if (field.name().equals(payloadField)) { + return upConvertPayload(inputRecord); + } + return inputRecord.get(field.name()); + } }
