Repository: nifi
Updated Branches:
  refs/heads/master 08189596d -> d3b167481


NIFI-5805: Pool the BinaryEncoders used by the 
WriteAvroResultWithExternalSchema writer. Unfortunately, the writer that embeds 
schemas does not allow for this optimization due to the Avro API

This closes #3160.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d3b16748
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d3b16748
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d3b16748

Branch: refs/heads/master
Commit: d3b16748139efe78373ca22ed116b0c7ed5dbbe3
Parents: 0818959
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Nov 8 14:54:05 2018 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Mon Nov 12 09:32:12 2018 -0500

----------------------------------------------------------------------
 .../apache/nifi/avro/AvroRecordSetWriter.java   | 69 ++++++++++++++------
 .../avro/WriteAvroResultWithExternalSchema.java | 36 +++++++---
 .../apache/nifi/avro/TestWriteAvroResult.java   | 40 ++++++------
 .../avro/TestWriteAvroResultWithoutSchema.java  | 61 +++++++++++++++--
 4 files changed, 153 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d3b16748/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 93d36dc..487b51f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -17,27 +17,22 @@
 
 package org.apache.nifi.avro;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.io.BinaryEncoder;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -48,6 +43,17 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 @Tags({"avro", "result", "set", "writer", "serializer", "record", "recordset", 
"row"})
 @CapabilityDescription("Writes the contents of a RecordSet in Binary Avro 
format.")
 public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter 
implements RecordSetWriterFactory {
@@ -61,7 +67,7 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
         LZO
     }
 
-    private static final PropertyDescriptor COMPRESSION_FORMAT = new 
PropertyDescriptor.Builder()
+    private static final PropertyDescriptor COMPRESSION_FORMAT = new Builder()
         .name("compression-format")
         .displayName("Compression Format")
         .description("Compression type to use when writing Avro files. Default 
is None.")
@@ -70,19 +76,33 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
         .required(true)
         .build();
 
-    private LoadingCache<String, Schema> compiledAvroSchemaCache;
+    static final PropertyDescriptor ENCODER_POOL_SIZE = new Builder()
+        .name("encoder-pool-size")
+        .displayName("Encoder Pool Size")
+        .description("Avro Writers require the use of an Encoder. Creation of 
Encoders is expensive, but once created, they can be reused. This property 
controls the maximum number of Encoders that" +
+            " can be pooled and reused. Setting this value too small can 
result in degraded performance, but setting it higher can result in more heap 
being used. This property is ignored if the" +
+            " Avro Writer is configured with a Schema Write Strategy of 'Embed 
Avro Schema'.")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .defaultValue("32")
+        .build();
 
     static final AllowableValue AVRO_EMBEDDED = new 
AllowableValue("avro-embedded", "Embed Avro Schema",
         "The FlowFile will have the Avro schema embedded into the content, as 
is typical with Avro");
 
     static final PropertyDescriptor CACHE_SIZE = new 
PropertyDescriptor.Builder()
-            .name("cache-size")
-            .displayName("Cache Size")
-            .description("Specifies how many Schemas should be cached")
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            .defaultValue("1000")
-            .required(true)
-            .build();
+        .name("cache-size")
+        .displayName("Cache Size")
+        .description("Specifies how many Schemas should be cached")
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .defaultValue("1000")
+        .required(true)
+        .build();
+
+    private LoadingCache<String, Schema> compiledAvroSchemaCache;
+    private volatile BlockingQueue<BinaryEncoder> encoderPool;
+
 
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
@@ -90,6 +110,16 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
         compiledAvroSchemaCache = Caffeine.newBuilder()
                 .maximumSize(cacheSize)
                 .build(schemaText -> new Schema.Parser().parse(schemaText));
+
+        final int capacity = 
context.getProperty(ENCODER_POOL_SIZE).evaluateAttributeExpressions().asInteger();
+        encoderPool = new LinkedBlockingQueue<>(capacity);
+    }
+
+    @OnDisabled
+    public void cleanup() {
+        if (encoderPool != null) {
+            encoderPool.clear();
+        }
     }
 
     @Override
@@ -117,7 +147,7 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
             if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
                 return new WriteAvroResultWithSchema(avroSchema, out, 
getCodecFactory(compressionFormat));
             } else {
-                return new WriteAvroResultWithExternalSchema(avroSchema, 
recordSchema, getSchemaAccessWriter(recordSchema), out);
+                return new WriteAvroResultWithExternalSchema(avroSchema, 
recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, 
getLogger());
             }
         } catch (final SchemaNotFoundException e) {
             throw new ProcessException("Could not determine the Avro Schema to 
use for writing the content", e);
@@ -146,6 +176,7 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
         final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(COMPRESSION_FORMAT);
         properties.add(CACHE_SIZE);
+        properties.add(ENCODER_POOL_SIZE);
         return properties;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d3b16748/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index 8464e45..a50efa0 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -17,22 +17,24 @@
 
 package org.apache.nifi.avro;
 
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
 public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter 
{
     private final SchemaAccessWriter schemaAccessWriter;
     private final RecordSchema recordSchema;
@@ -40,17 +42,26 @@ public class WriteAvroResultWithExternalSchema extends 
AbstractRecordSetWriter {
     private final BinaryEncoder encoder;
     private final OutputStream buffered;
     private final DatumWriter<GenericRecord> datumWriter;
+    private final BlockingQueue<BinaryEncoder> recycleQueue;
 
-    public WriteAvroResultWithExternalSchema(final Schema avroSchema, final 
RecordSchema recordSchema,
-        final SchemaAccessWriter schemaAccessWriter, final OutputStream out) 
throws IOException {
+    public WriteAvroResultWithExternalSchema(final Schema avroSchema, final 
RecordSchema recordSchema, final SchemaAccessWriter schemaAccessWriter,
+                                             final OutputStream out, final 
BlockingQueue<BinaryEncoder> recycleQueue, final ComponentLog logger) {
         super(out);
         this.recordSchema = recordSchema;
         this.schemaAccessWriter = schemaAccessWriter;
         this.avroSchema = avroSchema;
         this.buffered = new BufferedOutputStream(out);
+        this.recycleQueue = recycleQueue;
+
+        BinaryEncoder reusableEncoder = recycleQueue.poll();
+        if (reusableEncoder == null) {
+            logger.debug("Was not able to obtain a BinaryEncoder from reuse 
pool. This is normal for the first X number of iterations (where X is equal to 
the max size of the pool), " +
+                "but if this continues, it indicates that increasing the size 
of the pool will likely yield better performance for this Avro Writer.");
+        }
+
+        encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, 
reusableEncoder);
 
         datumWriter = new GenericDatumWriter<>(avroSchema);
-        encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null);
     }
 
     @Override
@@ -88,4 +99,13 @@ public class WriteAvroResultWithExternalSchema extends 
AbstractRecordSetWriter {
     public String getMimeType() {
         return "application/avro-binary";
     }
+
+    @Override
+    public void close() throws IOException {
+        if (encoder != null) {
+            recycleQueue.offer(encoder);
+        }
+
+        super.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/d3b16748/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 8c20ba7..4751f74 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -17,17 +17,30 @@
 
 package org.apache.nifi.avro;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.math.BigDecimal;
 import java.io.OutputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
@@ -43,22 +56,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.TimeZone;
 
-import org.apache.avro.Conversions;
-import org.apache.avro.LogicalType;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData.Array;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public abstract class TestWriteAvroResult {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d3b16748/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
index 33c0857..c592df0 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
@@ -17,26 +17,49 @@
 
 package org.apache.nifi.avro;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
 import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
 import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.util.MockComponentLog;
 import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
 
+    private final BlockingQueue<BinaryEncoder> encoderPool = new 
LinkedBlockingQueue<>(32);
+
     @Override
     protected RecordSetWriter createWriter(final Schema schema, final 
OutputStream out) throws IOException {
-        return new WriteAvroResultWithExternalSchema(schema, 
AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out);
+        return new WriteAvroResultWithExternalSchema(schema, 
AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out, 
encoderPool,
+            new MockComponentLog("id", new Object()));
     }
 
     @Override
@@ -55,4 +78,30 @@ public class TestWriteAvroResultWithoutSchema extends 
TestWriteAvroResult {
         new Schema.Parser().parse(schemaText);
     }
 
+
+    @Test
+    @Ignore("This test takes many seconds to run and is only really useful for 
comparing performance of the writer before and after changes, so it is 
@Ignored, but left in place to be run manually " +
+        "for performance comparisons before & after changes are made.")
+    public void testPerf() throws IOException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final OutputStream out = new NullOutputStream();
+
+        final Record record = new MapRecord(recordSchema, 
Collections.singletonMap("name", "John Doe"));
+        final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
+
+        final ComponentLog logger = new MockComponentLog("id", new Object());
+
+        final long start = System.nanoTime();
+        for (int i=0; i < 10_000_000; i++) {
+            try (final RecordSetWriter writer = new 
WriteAvroResultWithExternalSchema(avroSchema, recordSchema, new 
NopSchemaAccessWriter(), out, encoderPool, logger)) {
+                writer.write(RecordSet.of(record.getSchema(), record));
+            }
+        }
+
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+        System.out.println(millis);
+    }
 }

Reply via email to