Repository: nifi Updated Branches: refs/heads/master 91ed96f8c -> 68b42c9e5
NIFI-4055: Add a compression option to AvroRecordSetWriter This closes #1909. Signed-off-by: Pierre Villard <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/68b42c9e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/68b42c9e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/68b42c9e Branch: refs/heads/master Commit: 68b42c9e547608d78d301d5b3a3856fe4853b4cd Parents: 91ed96f Author: Steve Champagne <[email protected]> Authored: Fri Jun 9 18:39:16 2017 +0000 Committer: Pierre Villard <[email protected]> Committed: Mon Jun 26 19:16:31 2017 +0200 ---------------------------------------------------------------------- .../apache/nifi/avro/AvroRecordSetWriter.java | 66 +++++++++++++++++++- .../nifi/avro/WriteAvroResultWithSchema.java | 4 +- .../avro/TestWriteAvroResultWithSchema.java | 3 +- 3 files changed, 70 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/68b42c9e/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index fd09961..a8459a9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -20,6 +20,7 @@ package org.apache.nifi.avro; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; @@ -28,10 +29,13 @@ import java.util.Optional; import java.util.Set; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; @@ -48,6 +52,23 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); private static final int MAX_AVRO_SCHEMA_CACHE_SIZE = 20; + private enum CodecType { + BZIP2, + DEFLATE, + NONE, + SNAPPY, + LZO + } + + private static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() + .name("compression-format") + .displayName("Compression Format") + .description("Compression type to use when writing Avro files. Default is None.") + .allowableValues(CodecType.values()) + .defaultValue(CodecType.NONE.toString()) + .required(true) + .build(); + private final Map<String, Schema> compiledAvroSchemaCache = new LinkedHashMap<String, Schema>() { @Override protected boolean removeEldestEntry(final Map.Entry<String, Schema> eldest) { @@ -61,6 +82,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final FlowFile flowFile, final OutputStream out) throws IOException { final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue(); + final String compressionFormat = getConfigurationContext().getProperty(COMPRESSION_FORMAT).getValue(); try { final Schema avroSchema; @@ -80,7 +102,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement } if (AVRO_EMBEDDED.getValue().equals(strategyValue)) { - return new WriteAvroResultWithSchema(avroSchema, out); + return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat)); } else { return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out); } @@ -113,6 +135,30 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement } } + private CodecFactory getCodecFactory(String property) { + CodecType type = CodecType.valueOf(property); + switch (type) { + case BZIP2: + return CodecFactory.bzip2Codec(); + case DEFLATE: + return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL); + case LZO: + return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL); + case SNAPPY: + return CodecFactory.snappyCodec(); + case NONE: + default: + return CodecFactory.nullCodec(); + } + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(COMPRESSION_FORMAT); + return properties; + } + @Override protected List<AllowableValue> getSchemaWriteStrategyValues() { final List<AllowableValue> allowableValues = new ArrayList<>(); @@ -135,4 +181,22 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement return super.getRequiredSchemaFields(validationContext); } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext)); + final String writeStrategyValue = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue(); + final String compressionFormatValue = validationContext.getProperty(COMPRESSION_FORMAT).getValue(); + if (!writeStrategyValue.equalsIgnoreCase(AVRO_EMBEDDED.getValue()) + && !CodecType.NONE.toString().equals(compressionFormatValue)) { + results.add(new ValidationResult.Builder() + .subject(COMPRESSION_FORMAT.getName()) + .valid(false) + .explanation("Avro compression codecs are stored in the header of the Avro file and therefore " + + "requires the header to be embedded into the content.") + .build()); + } + + return results; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68b42c9e/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index ae2f109..ea327a4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -34,12 +35,13 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter { private final DataFileWriter<GenericRecord> dataFileWriter; private final Schema schema; - public WriteAvroResultWithSchema(final Schema schema, final OutputStream out) throws IOException { + public WriteAvroResultWithSchema(final Schema schema, final OutputStream out, final CodecFactory codec) throws IOException { super(out); this.schema = schema; final GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.setCodec(codec); dataFileWriter.create(schema, out); } http://git-wip-us.apache.org/repos/asf/nifi/blob/68b42c9e/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java index 9761076..b3eecde 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.StringType; @@ -33,7 +34,7 @@ public class TestWriteAvroResultWithSchema extends TestWriteAvroResult { @Override protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { - return new WriteAvroResultWithSchema(schema, out); + return new WriteAvroResultWithSchema(schema, out, CodecFactory.nullCodec()); } @Override
