Repository: nifi
Updated Branches:
  refs/heads/master a628aced6 -> 765df6781


NIFI-5757 Using Caffeine instead of slow synchronization on LinkedHashMap for 
caches - mainly avro schema caches

This closes #3111.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/765df678
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/765df678
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/765df678

Branch: refs/heads/master
Commit: 765df6781782d0988e958fb69d9825797a086fe7
Parents: a628ace
Author: Arek Burdach <[email protected]>
Authored: Fri Oct 26 16:49:50 2018 +0200
Committer: Mark Payne <[email protected]>
Committed: Fri Nov 9 14:50:24 2018 -0500

----------------------------------------------------------------------
 nifi-commons/nifi-record-path/pom.xml           |   5 +
 .../nifi/record/path/util/RecordPathCache.java  |  36 ++-----
 .../client/CachingSchemaRegistryClient.java     | 104 +++----------------
 .../nifi-avro-record-utils/pom.xml              |   5 +
 .../WriteAvroSchemaAttributeStrategy.java       |  32 ++----
 .../nifi-jolt-record-processors/pom.xml         |   5 +
 .../jolt/record/JoltTransformRecord.java        |  73 +++++--------
 .../processors/standard/ConvertJSONToSQL.java   |  61 ++++++-----
 .../processors/standard/JoltTransformJSON.java  |  90 ++++++----------
 .../processors/standard/PutDatabaseRecord.java  |  48 ++++-----
 .../nifi-record-serialization-services/pom.xml  |   5 +
 .../java/org/apache/nifi/avro/AvroReader.java   |  62 ++++++-----
 .../apache/nifi/avro/AvroRecordSetWriter.java   |  59 +++++------
 .../nifi-update-attribute-processor/pom.xml     |   5 +
 .../processors/attributes/UpdateAttribute.java  |  65 ++++++------
 15 files changed, 253 insertions(+), 402 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-commons/nifi-record-path/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/pom.xml 
b/nifi-commons/nifi-record-path/pom.xml
index 364654c..29a5984 100644
--- a/nifi-commons/nifi-record-path/pom.xml
+++ b/nifi-commons/nifi-record-path/pom.xml
@@ -71,5 +71,10 @@
             <artifactId>antlr-runtime</artifactId>
             <version>3.5.2</version>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.6.2</version>
+        </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java
 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java
index 243ad11..8cc9e24 100644
--- 
a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java
+++ 
b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java
@@ -17,42 +17,20 @@
 
 package org.apache.nifi.record.path.util;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.record.path.RecordPath;
 
 public class RecordPathCache {
-    private final Map<String, RecordPath> compiledRecordPaths;
+    private final LoadingCache<String, RecordPath> compiledRecordPaths;
 
     public RecordPathCache(final int cacheSize) {
-        compiledRecordPaths = new LinkedHashMap<String, RecordPath>() {
-            @Override
-            protected boolean removeEldestEntry(final Map.Entry<String, 
RecordPath> eldest) {
-                return size() >= cacheSize;
-            }
-        };
+        compiledRecordPaths = Caffeine.newBuilder()
+                .maximumSize(cacheSize)
+                .build(RecordPath::compile);
     }
 
     public RecordPath getCompiled(final String path) {
-        RecordPath compiled;
-        synchronized (this) {
-            compiled = compiledRecordPaths.get(path);
-        }
-
-        if (compiled != null) {
-            return compiled;
-        }
-
-        compiled = RecordPath.compile(path);
-
-        synchronized (this) {
-            final RecordPath existing = compiledRecordPaths.putIfAbsent(path, 
compiled);
-            if (existing != null) {
-                compiled = existing;
-            }
-        }
-
-        return compiled;
+        return compiledRecordPaths.get(path);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
index d82befe..9075ac2 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
@@ -17,113 +17,43 @@
 
 package org.apache.nifi.confluent.schemaregistry.client;
 
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.IOException;
+import java.time.Duration;
+
 
 public class CachingSchemaRegistryClient implements SchemaRegistryClient {
     private final SchemaRegistryClient client;
-    private final long expirationNanos;
 
-    private final Map<String, CachedRecordSchema> nameCache;
-    private final Map<Integer, CachedRecordSchema> idCache;
+    private final LoadingCache<String, RecordSchema> nameCache;
+    private final LoadingCache<Integer, RecordSchema> idCache;
 
 
     public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, 
final int cacheSize, final long expirationNanos) {
         this.client = toWrap;
-        this.expirationNanos = expirationNanos;
 
-        nameCache = new Cache<>(cacheSize);
-        idCache = new Cache<>(cacheSize);
+        nameCache = Caffeine.newBuilder()
+                .maximumSize(cacheSize)
+                .expireAfterWrite(Duration.ofNanos(expirationNanos))
+                .build(client::getSchema);
+        idCache = Caffeine.newBuilder()
+                .maximumSize(cacheSize)
+                .expireAfterWrite(Duration.ofNanos(expirationNanos))
+                .build(client::getSchema);
     }
 
     @Override
     public RecordSchema getSchema(final String schemaName) throws IOException, 
SchemaNotFoundException {
-        RecordSchema schema = getFromCache(nameCache, schemaName);
-        if (schema != null) {
-            return schema;
-        }
-
-        schema = client.getSchema(schemaName);
-
-        synchronized (nameCache) {
-            nameCache.put(schemaName, new CachedRecordSchema(schema));
-        }
-
-        return schema;
+        return nameCache.get(schemaName);
     }
 
     @Override
     public RecordSchema getSchema(final int schemaId) throws IOException, 
SchemaNotFoundException {
-        RecordSchema schema = getFromCache(idCache, schemaId);
-        if (schema != null) {
-            return schema;
-        }
-
-        schema = client.getSchema(schemaId);
-
-        synchronized (idCache) {
-            idCache.put(schemaId, new CachedRecordSchema(schema));
-        }
-
-        return schema;
-    }
-
-    private RecordSchema getFromCache(final Map<?, CachedRecordSchema> cache, 
final Object key) {
-        final CachedRecordSchema cachedSchema;
-        synchronized (cache) {
-            cachedSchema = cache.get(key);
-        }
-
-        if (cachedSchema == null) {
-            return null;
-        }
-
-        if (cachedSchema.isOlderThan(System.nanoTime() - expirationNanos)) {
-            return null;
-        }
-
-        return cachedSchema.getSchema();
-    }
-
-
-    private static class Cache<K, V> extends LinkedHashMap<K, V> {
-        private final int cacheSize;
-
-        public Cache(final int cacheSize) {
-            this.cacheSize = cacheSize;
-        }
-
-        @Override
-        protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
-            return size() >= cacheSize;
-        }
+        return idCache.get(schemaId);
     }
 
-
-    private static class CachedRecordSchema {
-        private final RecordSchema schema;
-        private final long cachedTimestamp;
-
-        public CachedRecordSchema(final RecordSchema schema) {
-            this(schema, System.nanoTime());
-        }
-
-        public CachedRecordSchema(final RecordSchema schema, final long 
timestamp) {
-            this.schema = schema;
-            this.cachedTimestamp = timestamp;
-        }
-
-        public RecordSchema getSchema() {
-            return schema;
-        }
-
-        public boolean isOlderThan(final long timestamp) {
-            return cachedTimestamp < timestamp;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
index 15ab69b..5a3c618 100755
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml
@@ -34,6 +34,11 @@
             <artifactId>avro</artifactId>
             <version>1.8.1</version>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.6.2</version>
+        </dependency>
         <!-- Other modules using nifi-avro-record-utils are expected to have 
these APIs available, typically through a NAR dependency -->
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
index 36484a5..97389a9 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
@@ -17,7 +17,8 @@
 
 package org.apache.nifi.schema.access;
 
-import org.apache.avro.Schema;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.serialization.record.RecordSchema;
 
@@ -25,18 +26,14 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
 public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
-    private final Map<RecordSchema, String> avroSchemaTextCache = new 
LinkedHashMap<RecordSchema, String>() {
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<RecordSchema, String> 
eldest) {
-            return size() > 10;
-        }
-    };
+    private final LoadingCache<RecordSchema, String> avroSchemaTextCache = 
Caffeine.newBuilder()
+            .maximumSize(10)
+            .build(schema -> 
AvroTypeUtil.extractAvroSchema(schema).toString());
 
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) 
throws IOException {
@@ -53,24 +50,7 @@ public class WriteAvroSchemaAttributeStrategy implements 
SchemaAccessWriter {
             }
         }
 
-        String schemaText;
-        synchronized (avroSchemaTextCache) {
-            schemaText = avroSchemaTextCache.get(schema);
-        }
-
-        if (schemaText == null) {
-            final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
-            schemaText = avroSchema.toString();
-
-            synchronized (avroSchemaTextCache) {
-                final String existing = 
avroSchemaTextCache.putIfAbsent(schema, schemaText);
-                if (existing != null) {
-                    schemaText = existing;
-                }
-            }
-        }
-
-        return Collections.singletonMap("avro.schema", schemaText);
+        return Collections.singletonMap("avro.schema", 
avroSchemaTextCache.get(schema));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml
index 75793e7..bfd6deb 100644
--- 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml
+++ 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml
@@ -53,6 +53,11 @@
             <artifactId>json-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.6.2</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <version>1.9.0-SNAPSHOT</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
index 86b721f..6ce4161 100644
--- 
a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
+++ 
b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java
@@ -20,6 +20,8 @@ import com.bazaarvoice.jolt.ContextualTransform;
 import com.bazaarvoice.jolt.JoltTransform;
 import com.bazaarvoice.jolt.JsonUtils;
 import com.bazaarvoice.jolt.Transform;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -65,10 +67,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -190,18 +192,12 @@ public class JoltTransformRecord extends 
AbstractProcessor {
     private final static Set<Relationship> relationships;
     private final static String DEFAULT_CHARSET = "UTF-8";
 
-    // Cache is guarded by synchronizing on 'this'.
-    private volatile int maxTransformsToCache = 10;
-    private final Map<String, JoltTransform> transformCache = new 
LinkedHashMap<String, JoltTransform>() {
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<String, JoltTransform> 
eldest) {
-            final boolean evict = size() > maxTransformsToCache;
-            if (evict) {
-                getLogger().debug("Removing Jolt Transform from cache because 
cache is full");
-            }
-            return evict;
-        }
-    };
+    /**
+     * It is a cache for transform objects. It keep values indexed by jolt 
specification string.
+     * For some cases the key could be empty. It means that it represents 
default transform (e.g. for custom transform
+     * when there is no jolt-record-spec specified).
+     */
+    private LoadingCache<Optional<String>, JoltTransform> transformCache;
 
     static {
         final List<PropertyDescriptor> _properties = new ArrayList<>();
@@ -352,25 +348,26 @@ public class JoltTransformRecord extends 
AbstractProcessor {
         }
     }
 
-    private JoltTransform getTransform(final ProcessContext context, final 
FlowFile flowFile) throws Exception {
-        final String specString;
+    private JoltTransform getTransform(final ProcessContext context, final 
FlowFile flowFile) {
+        final Optional<String> specString;
         if (context.getProperty(JOLT_SPEC).isSet()) {
-            specString = 
context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue();
+            specString = 
Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
         } else {
-            specString = null;
+            specString = Optional.empty();
         }
 
-        // Get the transform from our cache, if it exists.
-        JoltTransform transform;
-        synchronized (this) {
-            transform = transformCache.get(specString);
-        }
+        return transformCache.get(specString);
+    }
 
-        if (transform != null) {
-            return transform;
-        }
+    @OnScheduled
+    public void setup(final ProcessContext context) {
+        int maxTransformsToCache = 
context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build(specString -> createTransform(context, 
specString.orElse(null)));
+    }
 
-        // If no transform for our spec, create the transform.
+    private JoltTransform createTransform(final ProcessContext context, final 
String specString) throws Exception {
         final Object specJson;
         if (context.getProperty(JOLT_SPEC).isSet() && 
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
             specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
@@ -379,30 +376,10 @@ public class JoltTransformRecord extends 
AbstractProcessor {
         }
 
         if 
(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
-            transform = 
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
 context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+            return 
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
 context.getProperty(CUSTOM_CLASS).getValue(), specJson);
         } else {
-            transform = 
TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), 
context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
-        }
-
-        // Check again for the transform in our cache, since it's possible 
that another thread has
-        // already populated it. If absent from the cache, populate the cache. 
Otherwise, use the
-        // value from the cache.
-        synchronized (this) {
-            final JoltTransform existingTransform = 
transformCache.get(specString);
-            if (existingTransform == null) {
-                transformCache.put(specString, transform);
-            } else {
-                transform = existingTransform;
-            }
+            return 
TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), 
context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
         }
-
-        return transform;
-    }
-
-    @OnScheduled
-    public synchronized void setup(final ProcessContext context) {
-        transformCache.clear();
-        maxTransformsToCache = 
context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
     }
 
     protected FilenameFilter getJarFilenameFilter() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
index f198013..312c481 100755
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java
@@ -28,17 +28,17 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -216,6 +216,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             .defaultValue("sql")
             .build();
 
+    static final PropertyDescriptor TABLE_SCHEMA_CACHE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("table-schema-cache-size")
+            .displayName("Table Schema Cache Size")
+            .description("Specifies how many Table Schemas should be cached")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .required(true)
+            .build();
+
     static final Relationship REL_ORIGINAL = new Relationship.Builder()
             .name("original")
             .description("When a FlowFile is converted to SQL, the original 
JSON FlowFile is routed to this relationship")
@@ -230,14 +239,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
                     + "content or the JSON content missing a required field 
(if using an INSERT statement type).")
             .build();
 
-    private final Map<SchemaKey, TableSchema> schemaCache = new 
LinkedHashMap<SchemaKey, TableSchema>(100) {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> 
eldest) {
-            return size() >= 100;
-        }
-    };
+    private Cache<SchemaKey, TableSchema> schemaCache;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -254,6 +256,7 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         properties.add(QUOTED_IDENTIFIERS);
         properties.add(QUOTED_TABLE_IDENTIFIER);
         properties.add(SQL_PARAM_ATTR_PREFIX);
+        properties.add(TABLE_SCHEMA_CACHE_SIZE);
         return properties;
     }
 
@@ -270,14 +273,15 @@ public class ConvertJSONToSQL extends AbstractProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        synchronized (this) {
-            schemaCache.clear();
-        }
+        final int tableSchemaCacheSize = 
context.getProperty(TABLE_SCHEMA_CACHE_SIZE).asInteger();
+        schemaCache = Caffeine.newBuilder()
+                .maximumSize(tableSchemaCacheSize)
+                .build();
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        FlowFile flowFile = session.get();
+        final FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
         }
@@ -306,25 +310,20 @@ public class ConvertJSONToSQL extends AbstractProcessor {
         // Attribute prefix
         final String attributePrefix = 
context.getProperty(SQL_PARAM_ATTR_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
 
-        // get the database schema from the cache, if one exists. We do this 
in a synchronized block, rather than
-        // using a ConcurrentMap because the Map that we are using is a 
LinkedHashMap with a capacity such that if
-        // the Map grows beyond this capacity, old elements are evicted. We do 
this in order to avoid filling the
-        // Java Heap if there are a lot of different SQL statements being 
generated that reference different tables.
         TableSchema schema;
-        synchronized (this) {
-            schema = schemaCache.get(schemaKey);
-            if (schema == null) {
-                // No schema exists for this table yet. Query the database to 
determine the schema and put it into the cache.
+        try {
+            schema = schemaCache.get(schemaKey, key -> {
                 final DBCPService dbcpService = 
context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class);
-                try (final Connection conn = 
dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : 
flowFile.getAttributes())) {
-                    schema = TableSchema.from(conn, catalog, schemaName, 
tableName, translateFieldNames, includePrimaryKeys);
-                    schemaCache.put(schemaKey, schema);
+                try (final Connection conn = 
dbcpService.getConnection(flowFile.getAttributes())) {
+                    return TableSchema.from(conn, catalog, schemaName, 
tableName, translateFieldNames, includePrimaryKeys);
                 } catch (final SQLException e) {
-                    getLogger().error("Failed to convert {} into a SQL 
statement due to {}; routing to failure", new Object[] {flowFile, 
e.toString()}, e);
-                    session.transfer(flowFile, REL_FAILURE);
-                    return;
+                    throw new ProcessException(e);
                 }
-            }
+            });
+        } catch (ProcessException e) {
+            getLogger().error("Failed to convert {} into a SQL statement due 
to {}; routing to failure", new Object[]{flowFile, e.toString()}, e);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
         }
 
         // Parse the JSON document
@@ -423,8 +422,8 @@ public class ConvertJSONToSQL extends AbstractProcessor {
             session.transfer(sqlFlowFile, REL_SQL);
         }
 
-        flowFile = copyAttributesToOriginal(session, flowFile, 
fragmentIdentifier, arrayNode.size());
-        session.transfer(flowFile, REL_ORIGINAL);
+        FlowFile newFlowFile = copyAttributesToOriginal(session, flowFile, 
fragmentIdentifier, arrayNode.size());
+        session.transfer(newFlowFile, REL_ORIGINAL);
     }
 
     private Set<String> getNormalizedColumnNames(final JsonNode node, final 
boolean translateFieldNames) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
index 6a2b15a..617ca9e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
@@ -24,12 +24,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -153,18 +154,12 @@ public class JoltTransformJSON extends AbstractProcessor {
     private volatile ClassLoader customClassLoader;
     private final static String DEFAULT_CHARSET = "UTF-8";
 
-    // Cache is guarded by synchronizing on 'this'.
-    private volatile int maxTransformsToCache = 10;
-    private final Map<String, JoltTransform> transformCache = new 
LinkedHashMap<String, JoltTransform>() {
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<String, JoltTransform> 
eldest) {
-            final boolean evict = size() > maxTransformsToCache;
-            if (evict) {
-                getLogger().debug("Removing Jolt Transform from cache because 
cache is full");
-            }
-            return evict;
-        }
-    };
+    /**
+     * It is a cache for transform objects. It keep values indexed by jolt 
specification string.
+     * For some cases the key could be empty. It means that it represents 
default transform (e.g. for custom transform
+     * when there is no jolt-record-spec specified).
+     */
+    private LoadingCache<Optional<String>, JoltTransform> transformCache;
 
     static {
         final List<PropertyDescriptor> _properties = new ArrayList<>();
@@ -311,56 +306,22 @@ public class JoltTransformJSON extends AbstractProcessor {
     }
 
     private JoltTransform getTransform(final ProcessContext context, final 
FlowFile flowFile) throws Exception {
-        final String specString;
+        final Optional<String> specString;
         if (context.getProperty(JOLT_SPEC).isSet()) {
-            specString = 
context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue();
-        } else {
-            specString = null;
-        }
-
-        // Get the transform from our cache, if it exists.
-        JoltTransform transform = null;
-        synchronized (this) {
-            transform = transformCache.get(specString);
-        }
-
-        if (transform != null) {
-            return transform;
-        }
-
-        // If no transform for our spec, create the transform.
-        final Object specJson;
-        if (context.getProperty(JOLT_SPEC).isSet() && 
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
-            specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
-        } else {
-            specJson = null;
-        }
-
-        if 
(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
-            transform = TransformFactory.getCustomTransform(customClassLoader, 
context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+            specString = 
Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue());
         } else {
-            transform = TransformFactory.getTransform(customClassLoader, 
context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
+            specString = Optional.empty();
         }
 
-        // Check again for the transform in our cache, since it's possible 
that another thread has
-        // already populated it. If absent from the cache, populate the cache. 
Otherwise, use the
-        // value from the cache.
-        synchronized (this) {
-            final JoltTransform existingTransform = 
transformCache.get(specString);
-            if (existingTransform == null) {
-                transformCache.put(specString, transform);
-            } else {
-                transform = existingTransform;
-            }
-        }
-
-        return transform;
+        return transformCache.get(specString);
     }
 
     @OnScheduled
-    public synchronized void setup(final ProcessContext context) {
-        transformCache.clear();
-        maxTransformsToCache = 
context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+    public void setup(final ProcessContext context) {
+        int maxTransformsToCache = 
context.getProperty(TRANSFORM_CACHE_SIZE).asInteger();
+        transformCache = Caffeine.newBuilder()
+                .maximumSize(maxTransformsToCache)
+                .build(specString -> createTransform(context, 
specString.orElse(null)));
 
         try {
             if (context.getProperty(MODULES).isSet()) {
@@ -373,6 +334,21 @@ public class JoltTransformJSON extends AbstractProcessor {
         }
     }
 
+    private JoltTransform createTransform(final ProcessContext context, final 
String specString) throws Exception {
+        final Object specJson;
+        if (context.getProperty(JOLT_SPEC).isSet() && 
!SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
+            specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET);
+        } else {
+            specJson = null;
+        }
+
+        if 
(CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) {
+            return 
TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(),
 context.getProperty(CUSTOM_CLASS).getValue(), specJson);
+        } else {
+            return 
TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), 
context.getProperty(JOLT_TRANSFORM).getValue(), specJson);
+        }
+    }
+
     protected FilenameFilter getJarFilenameFilter(){
         return (dir, name) -> (name != null && name.endsWith(".jar"));
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index f4a4f5c..2f2d901 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.standard;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -69,7 +71,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -265,17 +266,18 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
-    protected static List<PropertyDescriptor> propDescriptors;
-
-    private final Map<SchemaKey, TableSchema> schemaCache = new 
LinkedHashMap<SchemaKey, TableSchema>(100) {
-        private static final long serialVersionUID = 1L;
+    static final PropertyDescriptor TABLE_SCHEMA_CACHE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("table-schema-cache-size")
+            .displayName("Table Schema Cache Size")
+            .description("Specifies how many Table Schemas should be cached")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .required(true)
+            .build();
 
-        @Override
-        protected boolean removeEldestEntry(Map.Entry<SchemaKey, TableSchema> 
eldest) {
-            return size() >= 100;
-        }
-    };
+    protected static List<PropertyDescriptor> propDescriptors;
 
+    private Cache<SchemaKey, TableSchema> schemaCache;
 
     static {
         final Set<Relationship> r = new HashSet<>();
@@ -300,6 +302,7 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
         pds.add(QUOTED_TABLE_IDENTIFIER);
         pds.add(QUERY_TIMEOUT);
         pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
+        pds.add(TABLE_SCHEMA_CACHE_SIZE);
 
         propDescriptors = Collections.unmodifiableList(pds);
     }
@@ -410,9 +413,10 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        synchronized (this) {
-            schemaCache.clear();
-        }
+        final int tableSchemaCacheSize = 
context.getProperty(TABLE_SCHEMA_CACHE_SIZE).asInteger();
+        schemaCache = Caffeine.newBuilder()
+                .maximumSize(tableSchemaCacheSize)
+                .build();
 
         process = new Put<>();
 
@@ -582,19 +586,13 @@ public class PutDatabaseRecord extends 
AbstractSessionFactoryProcessor {
         // cached but the primary keys will not be retrieved, causing future 
UPDATE statements to not have primary keys available
         final boolean includePrimaryKeys = updateKeys == null;
 
-        // get the database schema from the cache, if one exists. We do this 
in a synchronized block, rather than
-        // using a ConcurrentMap because the Map that we are using is a 
LinkedHashMap with a capacity such that if
-        // the Map grows beyond this capacity, old elements are evicted. We do 
this in order to avoid filling the
-        // Java Heap if there are a lot of different SQL statements being 
generated that reference different tables.
-        TableSchema tableSchema;
-        synchronized (this) {
-            tableSchema = schemaCache.get(schemaKey);
-            if (tableSchema == null) {
-                // No schema exists for this table yet. Query the database to 
determine the schema and put it into the cache.
-                tableSchema = TableSchema.from(con, catalog, schemaName, 
tableName, settings.translateFieldNames, includePrimaryKeys);
-                schemaCache.put(schemaKey, tableSchema);
+        TableSchema tableSchema = schemaCache.get(schemaKey, key -> {
+            try {
+                return TableSchema.from(con, catalog, schemaName, tableName, 
settings.translateFieldNames, includePrimaryKeys);
+            } catch (SQLException e) {
+                throw new ProcessException(e);
             }
-        }
+        });
         if (tableSchema == null) {
             throw new IllegalArgumentException("No table schema specified!");
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 60992f7..140a74b 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -114,6 +114,11 @@
             <artifactId>commons-text</artifactId>
             <version>1.4</version>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.6.2</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index 97643aa..434d73f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -17,13 +17,18 @@
 
 package org.apache.nifi.avro;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.avro.Schema;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
@@ -35,7 +40,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -46,15 +50,32 @@ import java.util.Optional;
 public class AvroReader extends SchemaRegistryService implements 
RecordReaderFactory {
     private final AllowableValue EMBEDDED_AVRO_SCHEMA = new 
AllowableValue("embedded-avro-schema",
         "Use Embedded Avro Schema", "The FlowFile has the Avro Schema embedded 
within the content, and this schema will be used.");
-    private static final int MAX_AVRO_SCHEMA_CACHE_SIZE = 20;
 
-    private final Map<String, Schema> compiledAvroSchemaCache = new 
LinkedHashMap<String, Schema>() {
-        @Override
-        protected boolean removeEldestEntry(final Map.Entry<String, Schema> 
eldest) {
-            return size() >= MAX_AVRO_SCHEMA_CACHE_SIZE;
-        }
-    };
+    static final PropertyDescriptor CACHE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("cache-size")
+            .displayName("Cache Size")
+            .description("Specifies how many Schemas should be cached")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("1000")
+            .required(true)
+            .build();
 
+    private LoadingCache<String, Schema> compiledAvroSchemaCache;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(CACHE_SIZE);
+        return properties;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
+        compiledAvroSchemaCache = Caffeine.newBuilder()
+                .maximumSize(cacheSize)
+                .build(schemaText -> new Schema.Parser().parse(schemaText));
+    }
 
     @Override
     protected List<AllowableValue> getSchemaAccessStrategyValues() {
@@ -94,7 +115,7 @@ public class AvroReader extends SchemaRegistryService 
implements RecordReaderFac
                 if (recordSchema.getSchemaFormat().isPresent() & 
recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
                     final Optional<String> textOption = 
recordSchema.getSchemaText();
                     if (textOption.isPresent()) {
-                        avroSchema = compileAvroSchema(textOption.get());
+                        avroSchema = 
compiledAvroSchemaCache.get(textOption.get());
                     } else {
                         avroSchema = 
AvroTypeUtil.extractAvroSchema(recordSchema);
                     }
@@ -109,29 +130,6 @@ public class AvroReader extends SchemaRegistryService 
implements RecordReaderFac
         }
     }
 
-    private Schema compileAvroSchema(final String text) {
-        // Access to the LinkedHashMap must be done while synchronized on this.
-        // However, if no compiled schema exists, we don't want to remain 
synchronized
-        // while we compile it, as compilation can be expensive. As a result, 
if there is
-        // not a compiled schema already, we will compile it outside of the 
synchronized
-        // block, and then re-synchronize to update the map. All of this is 
functionally
-        // equivalent to calling compiledAvroSchema.computeIfAbsent(text, t -> 
new Schema.Parser().parse(t));
-        // but does so without synchronizing when not necessary.
-        Schema compiled;
-        synchronized (this) {
-            compiled = compiledAvroSchemaCache.get(text);
-        }
-
-        if (compiled != null) {
-            return compiled;
-        }
-
-        final Schema newlyCompiled = new Schema.Parser().parse(text);
-        synchronized (this) {
-            return compiledAvroSchemaCache.computeIfAbsent(text, t -> 
newlyCompiled);
-        }
-    }
-
     @Override
     protected AllowableValue getDefaultSchemaAccessStrategy() {
         return EMBEDDED_AVRO_SCHEMA;

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 7e49841..93d36dc 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -22,22 +22,25 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
@@ -49,7 +52,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
 @CapabilityDescription("Writes the contents of a RecordSet in Binary Avro 
format.")
 public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter 
implements RecordSetWriterFactory {
     private static final Set<SchemaField> requiredSchemaFields = 
EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
-    private static final int MAX_AVRO_SCHEMA_CACHE_SIZE = 20;
 
     private enum CodecType {
         BZIP2,
@@ -68,16 +70,28 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
         .required(true)
         .build();
 
-    private final Map<String, Schema> compiledAvroSchemaCache = new 
LinkedHashMap<String, Schema>() {
-        @Override
-        protected boolean removeEldestEntry(final Map.Entry<String, Schema> 
eldest) {
-            return size() >= MAX_AVRO_SCHEMA_CACHE_SIZE;
-        }
-    };
+    private LoadingCache<String, Schema> compiledAvroSchemaCache;
 
     static final AllowableValue AVRO_EMBEDDED = new 
AllowableValue("avro-embedded", "Embed Avro Schema",
         "The FlowFile will have the Avro schema embedded into the content, as 
is typical with Avro");
 
+    static final PropertyDescriptor CACHE_SIZE = new 
PropertyDescriptor.Builder()
+            .name("cache-size")
+            .displayName("Cache Size")
+            .description("Specifies how many Schemas should be cached")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("1000")
+            .required(true)
+            .build();
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
+        compiledAvroSchemaCache = Caffeine.newBuilder()
+                .maximumSize(cacheSize)
+                .build(schemaText -> new Schema.Parser().parse(schemaText));
+    }
+
     @Override
     public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema recordSchema, final OutputStream out) throws IOException {
         final String strategyValue = 
getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
@@ -89,7 +103,7 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
                 if (recordSchema.getSchemaFormat().isPresent() && 
recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
                     final Optional<String> textOption = 
recordSchema.getSchemaText();
                     if (textOption.isPresent()) {
-                        avroSchema = compileAvroSchema(textOption.get());
+                        avroSchema = 
compiledAvroSchemaCache.get(textOption.get());
                     } else {
                         avroSchema = 
AvroTypeUtil.extractAvroSchema(recordSchema);
                     }
@@ -110,30 +124,6 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
         }
     }
 
-
-    private Schema compileAvroSchema(final String text) {
-        // Access to the LinkedHashMap must be done while synchronized on this.
-        // However, if no compiled schema exists, we don't want to remain 
synchronized
-        // while we compile it, as compilation can be expensive. As a result, 
if there is
-        // not a compiled schema already, we will compile it outside of the 
synchronized
-        // block, and then re-synchronize to update the map. All of this is 
functionally
-        // equivalent to calling compiledAvroSchema.computeIfAbsent(text, t -> 
new Schema.Parser().parse(t));
-        // but does so without synchronizing when not necessary.
-        Schema compiled;
-        synchronized (this) {
-            compiled = compiledAvroSchemaCache.get(text);
-        }
-
-        if (compiled != null) {
-            return compiled;
-        }
-
-        final Schema newlyCompiled = new Schema.Parser().parse(text);
-        synchronized (this) {
-            return compiledAvroSchemaCache.computeIfAbsent(text, t -> 
newlyCompiled);
-        }
-    }
-
     private CodecFactory getCodecFactory(String property) {
         CodecType type = CodecType.valueOf(property);
         switch (type) {
@@ -155,6 +145,7 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(COMPRESSION_FORMAT);
+        properties.add(CACHE_SIZE);
         return properties;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml
 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml
index d609faa..42dde93 100644
--- 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml
+++ 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml
@@ -38,6 +38,11 @@
             <version>3.8.1</version>
         </dependency>
         <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>2.6.2</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
index cee8d22..9b5a98a 100644
--- 
a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
+++ 
b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.attributes;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -65,7 +67,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -97,12 +98,18 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
     private final static Set<Relationship> statelessRelationshipSet;
     private final static Set<Relationship> statefulRelationshipSet;
 
-    private final Map<String, String> canonicalValueLookup = new 
LinkedHashMap<String, String>() {
-        @Override
-        protected boolean removeEldestEntry(final Map.Entry eldest) {
-            return size() > 100;
-        }
-    };
+    /**
+     * This field caches a 'canonical' value for a given attribute value. When 
this processor is used to update an attribute or add a new
+     * attribute, if Expression Language is used, we may well end up with a 
new String object for each attribute for each FlowFile. As a result,
+     * we will store a different String object for the attribute value of 
every FlowFile, meaning that we have to keep a lot of String objects
+     * in heap. By using this 'canonical lookup', we are able to keep only a 
single String object on the heap.
+     *
+     * For example, if we have a property named "abc" and the value is 
"${abc}${xyz}", and we send through 1,000 FlowFiles with attributes abc="abc"
+     * and xyz="xyz", then would end up with 1,000 String objects with a value 
of "abcxyz". By using this canonical representation, we are able to
+     * instead hold a single String whose value is "abcxyz" instead of holding 
1,000 String objects in heap (1,000 String objects may still be created
+     * when calling PropertyValue.evaluateAttributeExpressions, but this way 
those values are garbage collected).
+     */
+    private LoadingCache<String, String> canonicalValueLookup;
 
     // relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -185,6 +192,15 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
             .addValidator(Validator.VALID)
             .build();
 
+    public static final PropertyDescriptor CANONICAL_VALUE_LOOKUP_CACHE_SIZE = 
new PropertyDescriptor.Builder()
+            .name("canonical-value-lookup-cache-size")
+            .displayName("Cache Value Lookup Cache Size")
+            .description("Specifies how many canonical lookup values should be 
stored in the cache")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .defaultValue("100")
+            .required(true)
+            .build();
+
     private volatile Map<String, Action> defaultActions;
     private volatile boolean debugEnabled;
     private volatile boolean stateful = false;
@@ -205,6 +221,7 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
         descriptors.add(DELETE_ATTRIBUTES);
         descriptors.add(STORE_STATE);
         descriptors.add(STATEFUL_VARIABLES_INIT_VALUE);
+        descriptors.add(CANONICAL_VALUE_LOOKUP_CACHE_SIZE);
         return Collections.unmodifiableList(descriptors);
     }
 
@@ -245,6 +262,11 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException {
+        final int cacheSize = 
context.getProperty(CANONICAL_VALUE_LOOKUP_CACHE_SIZE).asInteger();
+        canonicalValueLookup = Caffeine.newBuilder()
+                .maximumSize(cacheSize)
+                .build(attributeValue -> attributeValue);
+
         
criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData()));
 
         propertyValues.clear();
@@ -594,30 +616,6 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
         }
     }
 
-    /**
-     * This method caches a 'canonical' value for a given attribute value. 
When this processor is used to update an attribute or add a new
-     * attribute, if Expression Language is used, we may well end up with a 
new String object for each attribute for each FlowFile. As a result,
-     * we will store a different String object for the attribute value of 
every FlowFile, meaning that we have to keep a lot of String objects
-     * in heap. By using this 'canonical lookup', we are able to keep only a 
single String object on the heap.
-     *
-     * For example, if we have a property named "abc" and the value is 
"${abc}${xyz}", and we send through 1,000 FlowFiles with attributes abc="abc"
-     * and xyz="xyz", then would end up with 1,000 String objects with a value 
of "abcxyz". By using this canonical representation, we are able to
-     * instead hold a single String whose value is "abcxyz" instead of holding 
1,000 String objects in heap (1,000 String objects may still be created
-     * when calling PropertyValue.evaluateAttributeExpressions, but this way 
those values are garbage collected).
-     *
-     * @param attributeValue the value whose canonical value should be return
-     * @return the canonical representation of the given attribute value
-     */
-    private synchronized String getCanonicalRepresentation(final String 
attributeValue) {
-        final String canonical = this.canonicalValueLookup.get(attributeValue);
-        if (canonical != null) {
-            return canonical;
-        }
-
-        this.canonicalValueLookup.put(attributeValue, attributeValue);
-        return attributeValue;
-    }
-
     // Executes the specified action on the specified flowfile.
     private FlowFile executeActions(final ProcessSession session, final 
ProcessContext context, final List<Rule> rules, final Map<String, Action> 
defaultActions, final FlowFile flowfile,
                                     final Map<String, String> 
stateInitialAttributes, final Map<String, String> stateWorkingAttributes) {
@@ -688,7 +686,7 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
                 if (notDeleted || setStatefulAttribute) {
                     try {
                         String newAttributeValue = 
getPropertyValue(action.getValue(), 
context).evaluateAttributeExpressions(flowfile, null, null, 
stateInitialAttributes).getValue();
-                        newAttributeValue = 
getCanonicalRepresentation(newAttributeValue);
+                        newAttributeValue = 
canonicalValueLookup.get(newAttributeValue);
 
                         // log if appropriate
                         if (debugEnabled) {
@@ -746,7 +744,8 @@ public class UpdateAttribute extends AbstractProcessor 
implements Searchable {
         final Map<String, Action> defaultActions = new HashMap<>();
 
         for (final Map.Entry<PropertyDescriptor, String> entry : 
properties.entrySet()) {
-            if(entry.getKey() != STORE_STATE && entry.getKey() != 
STATEFUL_VARIABLES_INIT_VALUE) {
+            if(entry.getKey() != STORE_STATE && entry.getKey() != 
STATEFUL_VARIABLES_INIT_VALUE
+                    && entry.getKey() != CANONICAL_VALUE_LOOKUP_CACHE_SIZE) {
                 final Action action = new Action();
                 action.setAttribute(entry.getKey().getName());
                 action.setValue(entry.getValue());

Reply via email to