Repository: nifi Updated Branches: refs/heads/master 08189596d -> d3b167481
NIFI-5805: Pool the BinaryEncoders used by the WriteAvroResultWithExternalSchema writer. Unfortunately, the writer that embeds schemas does not allow for this optimization due to the Avro API This closes #3160. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d3b16748 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d3b16748 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d3b16748 Branch: refs/heads/master Commit: d3b16748139efe78373ca22ed116b0c7ed5dbbe3 Parents: 0818959 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Nov 8 14:54:05 2018 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Mon Nov 12 09:32:12 2018 -0500 ---------------------------------------------------------------------- .../apache/nifi/avro/AvroRecordSetWriter.java | 69 ++++++++++++++------ .../avro/WriteAvroResultWithExternalSchema.java | 36 +++++++--- .../apache/nifi/avro/TestWriteAvroResult.java | 40 ++++++------ .../avro/TestWriteAvroResultWithoutSchema.java | 61 +++++++++++++++-- 4 files changed, 153 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d3b16748/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 93d36dc..487b51f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -17,27 +17,22 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.EnumSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; - import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; +import org.apache.avro.io.BinaryEncoder; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @@ -48,6 +43,17 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + @Tags({"avro", "result", "set", "writer", "serializer", "record", "recordset", "row"}) @CapabilityDescription("Writes the contents of a RecordSet in Binary Avro format.") public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory { @@ -61,7 +67,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement LZO } - private static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() + private static final PropertyDescriptor COMPRESSION_FORMAT = new Builder() .name("compression-format") .displayName("Compression Format") .description("Compression type to use when writing Avro files. Default is None.") @@ -70,19 +76,33 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement .required(true) .build(); - private LoadingCache<String, Schema> compiledAvroSchemaCache; + static final PropertyDescriptor ENCODER_POOL_SIZE = new Builder() + .name("encoder-pool-size") + .displayName("Encoder Pool Size") + .description("Avro Writers require the use of an Encoder. Creation of Encoders is expensive, but once created, they can be reused. This property controls the maximum number of Encoders that" + + " can be pooled and reused. Setting this value too small can result in degraded performance, but setting it higher can result in more heap being used. This property is ignored if the" + + " Avro Writer is configured with a Schema Write Strategy of 'Embed Avro Schema'.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("32") + .build(); static final AllowableValue AVRO_EMBEDDED = new AllowableValue("avro-embedded", "Embed Avro Schema", "The FlowFile will have the Avro schema embedded into the content, as is typical with Avro"); static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() - .name("cache-size") - .displayName("Cache Size") - .description("Specifies how many Schemas should be cached") - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .defaultValue("1000") - .required(true) - .build(); + .name("cache-size") + .displayName("Cache Size") + .description("Specifies how many Schemas should be cached") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .required(true) + .build(); + + private LoadingCache<String, Schema> compiledAvroSchemaCache; + private volatile BlockingQueue<BinaryEncoder> encoderPool; + @OnEnabled public void onEnabled(final ConfigurationContext context) { @@ -90,6 +110,16 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement compiledAvroSchemaCache = Caffeine.newBuilder() .maximumSize(cacheSize) .build(schemaText -> new Schema.Parser().parse(schemaText)); + + final int capacity = context.getProperty(ENCODER_POOL_SIZE).evaluateAttributeExpressions().asInteger(); + encoderPool = new LinkedBlockingQueue<>(capacity); + } + + @OnDisabled + public void cleanup() { + if (encoderPool != null) { + encoderPool.clear(); + } } @Override @@ -117,7 +147,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement if (AVRO_EMBEDDED.getValue().equals(strategyValue)) { return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat)); } else { - return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out); + return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger()); } } catch (final SchemaNotFoundException e) { throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e); @@ -146,6 +176,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); properties.add(COMPRESSION_FORMAT); properties.add(CACHE_SIZE); + properties.add(ENCODER_POOL_SIZE); return properties; } http://git-wip-us.apache.org/repos/asf/nifi/blob/d3b16748/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java index 8464e45..a50efa0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java @@ -17,22 +17,24 @@ package org.apache.nifi.avro; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessWriter; import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { private final SchemaAccessWriter schemaAccessWriter; private final RecordSchema recordSchema; @@ -40,17 +42,26 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { private final BinaryEncoder encoder; private final OutputStream buffered; private final DatumWriter<GenericRecord> datumWriter; + private final BlockingQueue<BinaryEncoder> recycleQueue; - public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, - final SchemaAccessWriter schemaAccessWriter, final OutputStream out) throws IOException { + public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccessWriter, + final OutputStream out, final BlockingQueue<BinaryEncoder> recycleQueue, final ComponentLog logger) { super(out); this.recordSchema = recordSchema; this.schemaAccessWriter = schemaAccessWriter; this.avroSchema = avroSchema; this.buffered = new BufferedOutputStream(out); + this.recycleQueue = recycleQueue; + + BinaryEncoder reusableEncoder = recycleQueue.poll(); + if (reusableEncoder == null) { + logger.debug("Was not able to obtain a BinaryEncoder from reuse pool. This is normal for the first X number of iterations (where X is equal to the max size of the pool), " + + "but if this continues, it indicates that increasing the size of the pool will likely yield better performance for this Avro Writer."); + } + + encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, reusableEncoder); datumWriter = new GenericDatumWriter<>(avroSchema); - encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null); } @Override @@ -88,4 +99,13 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter { public String getMimeType() { return "application/avro-binary"; } + + @Override + public void close() throws IOException { + if (encoder != null) { + recycleQueue.offer(encoder); + } + + super.close(); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/d3b16748/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java index 8c20ba7..4751f74 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java @@ -17,17 +17,30 @@ package org.apache.nifi.avro; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.Array; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.math.BigDecimal; import java.io.OutputStream; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Time; @@ -43,22 +56,9 @@ import java.util.Map; import java.util.Objects; import java.util.TimeZone; -import org.apache.avro.Conversions; -import org.apache.avro.LogicalType; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData.Array; -import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.WriteResult; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.RecordSet; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public abstract class TestWriteAvroResult { http://git-wip-us.apache.org/repos/asf/nifi/blob/d3b16748/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java index 33c0857..c592df0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java @@ -17,26 +17,49 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.NopSchemaAccessWriter; import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy; import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.util.MockComponentLog; import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult { + private final BlockingQueue<BinaryEncoder> encoderPool = new LinkedBlockingQueue<>(32); + @Override protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { - return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out); + return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out, encoderPool, + new MockComponentLog("id", new Object())); } @Override @@ -55,4 +78,30 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult { new Schema.Parser().parse(schemaText); } + + @Test + @Ignore("This test takes many seconds to run and is only really useful for comparing performance of the writer before and after changes, so it is @Ignored, but left in place to be run manually " + + "for performance comparisons before & after changes are made.") + public void testPerf() throws IOException { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); + final RecordSchema recordSchema = new SimpleRecordSchema(fields); + + final OutputStream out = new NullOutputStream(); + + final Record record = new MapRecord(recordSchema, Collections.singletonMap("name", "John Doe")); + final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema); + + final ComponentLog logger = new MockComponentLog("id", new Object()); + + final long start = System.nanoTime(); + for (int i=0; i < 10_000_000; i++) { + try (final RecordSetWriter writer = new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, new NopSchemaAccessWriter(), out, encoderPool, logger)) { + writer.write(RecordSet.of(record.getSchema(), record)); + } + } + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + System.out.println(millis); + } }