Repository: nifi Updated Branches: refs/heads/master a628aced6 -> 765df6781
NIFI-5757 Using Caffeine instead of slow synchronization on LinkedHashMap for caches - mainly avro schema caches This closes #3111. Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/765df678 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/765df678 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/765df678 Branch: refs/heads/master Commit: 765df6781782d0988e958fb69d9825797a086fe7 Parents: a628ace Author: Arek Burdach <[email protected]> Authored: Fri Oct 26 16:49:50 2018 +0200 Committer: Mark Payne <[email protected]> Committed: Fri Nov 9 14:50:24 2018 -0500 ---------------------------------------------------------------------- nifi-commons/nifi-record-path/pom.xml | 5 + .../nifi/record/path/util/RecordPathCache.java | 36 ++----- .../client/CachingSchemaRegistryClient.java | 104 +++---------------- .../nifi-avro-record-utils/pom.xml | 5 + .../WriteAvroSchemaAttributeStrategy.java | 32 ++---- .../nifi-jolt-record-processors/pom.xml | 5 + .../jolt/record/JoltTransformRecord.java | 73 +++++-------- .../processors/standard/ConvertJSONToSQL.java | 61 ++++++----- .../processors/standard/JoltTransformJSON.java | 90 ++++++---------- .../processors/standard/PutDatabaseRecord.java | 48 ++++----- .../nifi-record-serialization-services/pom.xml | 5 + .../java/org/apache/nifi/avro/AvroReader.java | 62 ++++++----- .../apache/nifi/avro/AvroRecordSetWriter.java | 59 +++++------ .../nifi-update-attribute-processor/pom.xml | 5 + .../processors/attributes/UpdateAttribute.java | 65 ++++++------ 15 files changed, 253 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-commons/nifi-record-path/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/pom.xml b/nifi-commons/nifi-record-path/pom.xml index 364654c..29a5984 100644 --- a/nifi-commons/nifi-record-path/pom.xml +++ b/nifi-commons/nifi-record-path/pom.xml @@ -71,5 +71,10 @@ <artifactId>antlr-runtime</artifactId> <version>3.5.2</version> </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>2.6.2</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java index 243ad11..8cc9e24 100644 --- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java @@ -17,42 +17,20 @@ package org.apache.nifi.record.path.util; -import java.util.LinkedHashMap; -import java.util.Map; - +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.nifi.record.path.RecordPath; public class RecordPathCache { - private final Map<String, RecordPath> compiledRecordPaths; + private final LoadingCache<String, RecordPath> compiledRecordPaths; public RecordPathCache(final int cacheSize) { - compiledRecordPaths = new LinkedHashMap<String, RecordPath>() { - @Override - protected boolean removeEldestEntry(final Map.Entry<String, RecordPath> eldest) { - return size() >= cacheSize; - } - }; + compiledRecordPaths = Caffeine.newBuilder() + .maximumSize(cacheSize) + .build(RecordPath::compile); } public RecordPath getCompiled(final String path) { - RecordPath compiled; - synchronized (this) { - compiled = compiledRecordPaths.get(path); - } - - if (compiled != null) { - return compiled; - } - - compiled = RecordPath.compile(path); - - synchronized (this) { - final RecordPath existing = compiledRecordPaths.putIfAbsent(path, compiled); - if (existing != null) { - compiled = existing; - } - } - - return compiled; + return compiledRecordPaths.get(path); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java index d82befe..9075ac2 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java @@ -17,113 +17,43 @@ package org.apache.nifi.confluent.schemaregistry.client; -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Map; - +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.time.Duration; + public class CachingSchemaRegistryClient implements SchemaRegistryClient { private final SchemaRegistryClient client; - private final long expirationNanos; - private final Map<String, CachedRecordSchema> nameCache; - private final Map<Integer, CachedRecordSchema> idCache; + private final LoadingCache<String, RecordSchema> nameCache; + private final LoadingCache<Integer, RecordSchema> idCache; public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int cacheSize, final long expirationNanos) { this.client = toWrap; - this.expirationNanos = expirationNanos; - nameCache = new Cache<>(cacheSize); - idCache = new Cache<>(cacheSize); + nameCache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .expireAfterWrite(Duration.ofNanos(expirationNanos)) + .build(client::getSchema); + idCache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .expireAfterWrite(Duration.ofNanos(expirationNanos)) + .build(client::getSchema); } @Override public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException { - RecordSchema schema = getFromCache(nameCache, schemaName); - if (schema != null) { - return schema; - } - - schema = client.getSchema(schemaName); - - synchronized (nameCache) { - nameCache.put(schemaName, new CachedRecordSchema(schema)); - } - - return schema; + return nameCache.get(schemaName); } @Override public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException { - RecordSchema schema = getFromCache(idCache, schemaId); - if (schema != null) { - return schema; - } - - schema = client.getSchema(schemaId); - - synchronized (idCache) { - idCache.put(schemaId, new CachedRecordSchema(schema)); - } - - return schema; - } - - private RecordSchema getFromCache(final Map<?, CachedRecordSchema> cache, final Object key) { - final CachedRecordSchema cachedSchema; - synchronized (cache) { - cachedSchema = cache.get(key); - } - - if (cachedSchema == null) { - return null; - } - - if (cachedSchema.isOlderThan(System.nanoTime() - expirationNanos)) { - return null; - } - - return cachedSchema.getSchema(); - } - - - private static class Cache<K, V> extends LinkedHashMap<K, V> { - private final int cacheSize; - - public Cache(final int cacheSize) { - this.cacheSize = cacheSize; - } - - @Override - protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) { - return size() >= cacheSize; - } + return idCache.get(schemaId); } - - private static class CachedRecordSchema { - private final RecordSchema schema; - private final long cachedTimestamp; - - public CachedRecordSchema(final RecordSchema schema) { - this(schema, System.nanoTime()); - } - - public CachedRecordSchema(final RecordSchema schema, final long timestamp) { - this.schema = schema; - this.cachedTimestamp = timestamp; - } - - public RecordSchema getSchema() { - return schema; - } - - public boolean isOlderThan(final long timestamp) { - return cachedTimestamp < timestamp; - } - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml index 15ab69b..5a3c618 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml @@ -34,6 +34,11 @@ <artifactId>avro</artifactId> <version>1.8.1</version> </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>2.6.2</version> + </dependency> <!-- Other modules using nifi-avro-record-utils are expected to have these APIs available, typically through a NAR dependency --> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java index 36484a5..97389a9 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java @@ -17,7 +17,8 @@ package org.apache.nifi.schema.access; -import org.apache.avro.Schema; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.serialization.record.RecordSchema; @@ -25,18 +26,14 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Collections; import java.util.EnumSet; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.Set; public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter { - private final Map<RecordSchema, String> avroSchemaTextCache = new LinkedHashMap<RecordSchema, String>() { - @Override - protected boolean removeEldestEntry(Map.Entry<RecordSchema, String> eldest) { - return size() > 10; - } - }; + private final LoadingCache<RecordSchema, String> avroSchemaTextCache = Caffeine.newBuilder() + .maximumSize(10) + .build(schema -> AvroTypeUtil.extractAvroSchema(schema).toString()); @Override public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { @@ -53,24 +50,7 @@ public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter { } } - String schemaText; - synchronized (avroSchemaTextCache) { - schemaText = avroSchemaTextCache.get(schema); - } - - if (schemaText == null) { - final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema); - schemaText = avroSchema.toString(); - - synchronized (avroSchemaTextCache) { - final String existing = avroSchemaTextCache.putIfAbsent(schema, schemaText); - if (existing != null) { - schemaText = existing; - } - } - } - - return Collections.singletonMap("avro.schema", schemaText); + return Collections.singletonMap("avro.schema", avroSchemaTextCache.get(schema)); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml index 75793e7..bfd6deb 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml @@ -53,6 +53,11 @@ <artifactId>json-utils</artifactId> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>2.6.2</version> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <version>1.9.0-SNAPSHOT</version> http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java index 86b721f..6ce4161 100644 --- a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java @@ -20,6 +20,8 @@ import com.bazaarvoice.jolt.ContextualTransform; import com.bazaarvoice.jolt.JoltTransform; import com.bazaarvoice.jolt.JsonUtils; import com.bazaarvoice.jolt.Transform; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -65,10 +67,10 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -190,18 +192,12 @@ public class JoltTransformRecord extends AbstractProcessor { private final static Set<Relationship> relationships; private final static String DEFAULT_CHARSET = "UTF-8"; - // Cache is guarded by synchronizing on 'this'. - private volatile int maxTransformsToCache = 10; - private final Map<String, JoltTransform> transformCache = new LinkedHashMap<String, JoltTransform>() { - @Override - protected boolean removeEldestEntry(Map.Entry<String, JoltTransform> eldest) { - final boolean evict = size() > maxTransformsToCache; - if (evict) { - getLogger().debug("Removing Jolt Transform from cache because cache is full"); - } - return evict; - } - }; + /** + * It is a cache for transform objects. It keep values indexed by jolt specification string. + * For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform + * when there is no jolt-record-spec specified). + */ + private LoadingCache<Optional<String>, JoltTransform> transformCache; static { final List<PropertyDescriptor> _properties = new ArrayList<>(); @@ -352,25 +348,26 @@ public class JoltTransformRecord extends AbstractProcessor { } } - private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception { - final String specString; + private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) { + final Optional<String> specString; if (context.getProperty(JOLT_SPEC).isSet()) { - specString = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue(); + specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue()); } else { - specString = null; + specString = Optional.empty(); } - // Get the transform from our cache, if it exists. - JoltTransform transform; - synchronized (this) { - transform = transformCache.get(specString); - } + return transformCache.get(specString); + } - if (transform != null) { - return transform; - } + @OnScheduled + public void setup(final ProcessContext context) { + int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); + transformCache = Caffeine.newBuilder() + .maximumSize(maxTransformsToCache) + .build(specString -> createTransform(context, specString.orElse(null))); + } - // If no transform for our spec, create the transform. + private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception { final Object specJson; if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); @@ -379,30 +376,10 @@ public class JoltTransformRecord extends AbstractProcessor { } if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { - transform = TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson); + return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson); } else { - transform = TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson); - } - - // Check again for the transform in our cache, since it's possible that another thread has - // already populated it. If absent from the cache, populate the cache. Otherwise, use the - // value from the cache. - synchronized (this) { - final JoltTransform existingTransform = transformCache.get(specString); - if (existingTransform == null) { - transformCache.put(specString, transform); - } else { - transform = existingTransform; - } + return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson); } - - return transform; - } - - @OnScheduled - public synchronized void setup(final ProcessContext context) { - transformCache.clear(); - maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); } protected FilenameFilter getJarFilenameFilter() { http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java index f198013..312c481 100755 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertJSONToSQL.java @@ -28,17 +28,17 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -216,6 +216,15 @@ public class ConvertJSONToSQL extends AbstractProcessor { .defaultValue("sql") .build(); + static final PropertyDescriptor TABLE_SCHEMA_CACHE_SIZE = new PropertyDescriptor.Builder() + .name("table-schema-cache-size") + .displayName("Table Schema Cache Size") + .description("Specifies how many Table Schemas should be cached") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .required(true) + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() .name("original") .description("When a FlowFile is converted to SQL, the original JSON FlowFile is routed to this relationship") @@ -230,14 +239,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { + "content or the JSON content missing a required field (if using an INSERT statement type).") .build(); - private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) { - private static final long serialVersionUID = 1L; - - @Override - protected boolean removeEldestEntry(Map.Entry<SchemaKey,TableSchema> eldest) { - return size() >= 100; - } - }; + private Cache<SchemaKey, TableSchema> schemaCache; @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -254,6 +256,7 @@ public class ConvertJSONToSQL extends AbstractProcessor { properties.add(QUOTED_IDENTIFIERS); properties.add(QUOTED_TABLE_IDENTIFIER); properties.add(SQL_PARAM_ATTR_PREFIX); + properties.add(TABLE_SCHEMA_CACHE_SIZE); return properties; } @@ -270,14 +273,15 @@ public class ConvertJSONToSQL extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { - synchronized (this) { - schemaCache.clear(); - } + final int tableSchemaCacheSize = context.getProperty(TABLE_SCHEMA_CACHE_SIZE).asInteger(); + schemaCache = Caffeine.newBuilder() + .maximumSize(tableSchemaCacheSize) + .build(); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); + final FlowFile flowFile = session.get(); if (flowFile == null) { return; } @@ -306,25 +310,20 @@ public class ConvertJSONToSQL extends AbstractProcessor { // Attribute prefix final String attributePrefix = context.getProperty(SQL_PARAM_ATTR_PREFIX).evaluateAttributeExpressions(flowFile).getValue(); - // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than - // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if - // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the - // Java Heap if there are a lot of different SQL statements being generated that reference different tables. TableSchema schema; - synchronized (this) { - schema = schemaCache.get(schemaKey); - if (schema == null) { - // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. + try { + schema = schemaCache.get(schemaKey, key -> { final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); - try (final Connection conn = dbcpService.getConnection(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes())) { - schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys); - schemaCache.put(schemaKey, schema); + try (final Connection conn = dbcpService.getConnection(flowFile.getAttributes())) { + return TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys); } catch (final SQLException e) { - getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[] {flowFile, e.toString()}, e); - session.transfer(flowFile, REL_FAILURE); - return; + throw new ProcessException(e); } - } + }); + } catch (ProcessException e) { + getLogger().error("Failed to convert {} into a SQL statement due to {}; routing to failure", new Object[]{flowFile, e.toString()}, e); + session.transfer(flowFile, REL_FAILURE); + return; } // Parse the JSON document @@ -423,8 +422,8 @@ public class ConvertJSONToSQL extends AbstractProcessor { session.transfer(sqlFlowFile, REL_SQL); } - flowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, arrayNode.size()); - session.transfer(flowFile, REL_ORIGINAL); + FlowFile newFlowFile = copyAttributesToOriginal(session, flowFile, fragmentIdentifier, arrayNode.size()); + session.transfer(newFlowFile, REL_ORIGINAL); } private Set<String> getNormalizedColumnNames(final JsonNode node, final boolean translateFieldNames) { http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java index 6a2b15a..617ca9e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java @@ -24,12 +24,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Set; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -153,18 +154,12 @@ public class JoltTransformJSON extends AbstractProcessor { private volatile ClassLoader customClassLoader; private final static String DEFAULT_CHARSET = "UTF-8"; - // Cache is guarded by synchronizing on 'this'. - private volatile int maxTransformsToCache = 10; - private final Map<String, JoltTransform> transformCache = new LinkedHashMap<String, JoltTransform>() { - @Override - protected boolean removeEldestEntry(Map.Entry<String, JoltTransform> eldest) { - final boolean evict = size() > maxTransformsToCache; - if (evict) { - getLogger().debug("Removing Jolt Transform from cache because cache is full"); - } - return evict; - } - }; + /** + * It is a cache for transform objects. It keep values indexed by jolt specification string. + * For some cases the key could be empty. It means that it represents default transform (e.g. for custom transform + * when there is no jolt-record-spec specified). + */ + private LoadingCache<Optional<String>, JoltTransform> transformCache; static { final List<PropertyDescriptor> _properties = new ArrayList<>(); @@ -311,56 +306,22 @@ public class JoltTransformJSON extends AbstractProcessor { } private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception { - final String specString; + final Optional<String> specString; if (context.getProperty(JOLT_SPEC).isSet()) { - specString = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue(); - } else { - specString = null; - } - - // Get the transform from our cache, if it exists. - JoltTransform transform = null; - synchronized (this) { - transform = transformCache.get(specString); - } - - if (transform != null) { - return transform; - } - - // If no transform for our spec, create the transform. - final Object specJson; - if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { - specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); - } else { - specJson = null; - } - - if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { - transform = TransformFactory.getCustomTransform(customClassLoader, context.getProperty(CUSTOM_CLASS).getValue(), specJson); + specString = Optional.of(context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue()); } else { - transform = TransformFactory.getTransform(customClassLoader, context.getProperty(JOLT_TRANSFORM).getValue(), specJson); + specString = Optional.empty(); } - // Check again for the transform in our cache, since it's possible that another thread has - // already populated it. If absent from the cache, populate the cache. Otherwise, use the - // value from the cache. - synchronized (this) { - final JoltTransform existingTransform = transformCache.get(specString); - if (existingTransform == null) { - transformCache.put(specString, transform); - } else { - transform = existingTransform; - } - } - - return transform; + return transformCache.get(specString); } @OnScheduled - public synchronized void setup(final ProcessContext context) { - transformCache.clear(); - maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); + public void setup(final ProcessContext context) { + int maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); + transformCache = Caffeine.newBuilder() + .maximumSize(maxTransformsToCache) + .build(specString -> createTransform(context, specString.orElse(null))); try { if (context.getProperty(MODULES).isSet()) { @@ -373,6 +334,21 @@ public class JoltTransformJSON extends AbstractProcessor { } } + private JoltTransform createTransform(final ProcessContext context, final String specString) throws Exception { + final Object specJson; + if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { + specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); + } else { + specJson = null; + } + + if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { + return TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson); + } else { + return TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson); + } + } + protected FilenameFilter getJarFilenameFilter(){ return (dir, name) -> (name != null && name.endsWith(".jar")); } http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index f4a4f5c..2f2d901 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.standard; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -69,7 +71,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -265,17 +266,18 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); - protected static List<PropertyDescriptor> propDescriptors; - - private final Map<SchemaKey, TableSchema> schemaCache = new LinkedHashMap<SchemaKey, TableSchema>(100) { - private static final long serialVersionUID = 1L; + static final PropertyDescriptor TABLE_SCHEMA_CACHE_SIZE = new PropertyDescriptor.Builder() + .name("table-schema-cache-size") + .displayName("Table Schema Cache Size") + .description("Specifies how many Table Schemas should be cached") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .required(true) + .build(); - @Override - protected boolean removeEldestEntry(Map.Entry<SchemaKey, TableSchema> eldest) { - return size() >= 100; - } - }; + protected static List<PropertyDescriptor> propDescriptors; + private Cache<SchemaKey, TableSchema> schemaCache; static { final Set<Relationship> r = new HashSet<>(); @@ -300,6 +302,7 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { pds.add(QUOTED_TABLE_IDENTIFIER); pds.add(QUERY_TIMEOUT); pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); + pds.add(TABLE_SCHEMA_CACHE_SIZE); propDescriptors = Collections.unmodifiableList(pds); } @@ -410,9 +413,10 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { - synchronized (this) { - schemaCache.clear(); - } + final int tableSchemaCacheSize = context.getProperty(TABLE_SCHEMA_CACHE_SIZE).asInteger(); + schemaCache = Caffeine.newBuilder() + .maximumSize(tableSchemaCacheSize) + .build(); process = new Put<>(); @@ -582,19 +586,13 @@ public class PutDatabaseRecord extends AbstractSessionFactoryProcessor { // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available final boolean includePrimaryKeys = updateKeys == null; - // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than - // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if - // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the - // Java Heap if there are a lot of different SQL statements being generated that reference different tables. - TableSchema tableSchema; - synchronized (this) { - tableSchema = schemaCache.get(schemaKey); - if (tableSchema == null) { - // No schema exists for this table yet. Query the database to determine the schema and put it into the cache. - tableSchema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys); - schemaCache.put(schemaKey, tableSchema); + TableSchema tableSchema = schemaCache.get(schemaKey, key -> { + try { + return TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys); + } catch (SQLException e) { + throw new ProcessException(e); } - } + }); if (tableSchema == null) { throw new IllegalArgumentException("No table schema specified!"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 60992f7..140a74b 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -114,6 +114,11 @@ <artifactId>commons-text</artifactId> <version>1.4</version> </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>2.6.2</version> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index 97643aa..434d73f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -17,13 +17,18 @@ package org.apache.nifi.avro; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schemaregistry.services.SchemaRegistry; @@ -35,7 +40,6 @@ import org.apache.nifi.serialization.record.RecordSchema; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -46,15 +50,32 @@ import java.util.Optional; public class AvroReader extends SchemaRegistryService implements RecordReaderFactory { private final AllowableValue EMBEDDED_AVRO_SCHEMA = new AllowableValue("embedded-avro-schema", "Use Embedded Avro Schema", "The FlowFile has the Avro Schema embedded within the content, and this schema will be used."); - private static final int MAX_AVRO_SCHEMA_CACHE_SIZE = 20; - private final Map<String, Schema> compiledAvroSchemaCache = new LinkedHashMap<String, Schema>() { - @Override - protected boolean removeEldestEntry(final Map.Entry<String, Schema> eldest) { - return size() >= MAX_AVRO_SCHEMA_CACHE_SIZE; - } - }; + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("cache-size") + .displayName("Cache Size") + .description("Specifies how many Schemas should be cached") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .required(true) + .build(); + private LoadingCache<String, Schema> compiledAvroSchemaCache; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(CACHE_SIZE); + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); + compiledAvroSchemaCache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .build(schemaText -> new Schema.Parser().parse(schemaText)); + } @Override protected List<AllowableValue> getSchemaAccessStrategyValues() { @@ -94,7 +115,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) { final Optional<String> textOption = recordSchema.getSchemaText(); if (textOption.isPresent()) { - avroSchema = compileAvroSchema(textOption.get()); + avroSchema = compiledAvroSchemaCache.get(textOption.get()); } else { avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema); } @@ -109,29 +130,6 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac } } - private Schema compileAvroSchema(final String text) { - // Access to the LinkedHashMap must be done while synchronized on this. - // However, if no compiled schema exists, we don't want to remain synchronized - // while we compile it, as compilation can be expensive. As a result, if there is - // not a compiled schema already, we will compile it outside of the synchronized - // block, and then re-synchronize to update the map. All of this is functionally - // equivalent to calling compiledAvroSchema.computeIfAbsent(text, t -> new Schema.Parser().parse(t)); - // but does so without synchronizing when not necessary. - Schema compiled; - synchronized (this) { - compiled = compiledAvroSchemaCache.get(text); - } - - if (compiled != null) { - return compiled; - } - - final Schema newlyCompiled = new Schema.Parser().parse(text); - synchronized (this) { - return compiledAvroSchemaCache.computeIfAbsent(text, t -> newlyCompiled); - } - } - @Override protected AllowableValue getDefaultSchemaAccessStrategy() { return EMBEDDED_AVRO_SCHEMA; http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 7e49841..93d36dc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -22,22 +22,25 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; @@ -49,7 +52,6 @@ import org.apache.nifi.serialization.record.RecordSchema; @CapabilityDescription("Writes the contents of a RecordSet in Binary Avro format.") public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory { private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); - private static final int MAX_AVRO_SCHEMA_CACHE_SIZE = 20; private enum CodecType { BZIP2, @@ -68,16 +70,28 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement .required(true) .build(); - private final Map<String, Schema> compiledAvroSchemaCache = new LinkedHashMap<String, Schema>() { - @Override - protected boolean removeEldestEntry(final Map.Entry<String, Schema> eldest) { - return size() >= MAX_AVRO_SCHEMA_CACHE_SIZE; - } - }; + private LoadingCache<String, Schema> compiledAvroSchemaCache; static final AllowableValue AVRO_EMBEDDED = new AllowableValue("avro-embedded", "Embed Avro Schema", "The FlowFile will have the Avro schema embedded into the content, as is typical with Avro"); + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("cache-size") + .displayName("Cache Size") + .description("Specifies how many Schemas should be cached") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .required(true) + .build(); + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); + compiledAvroSchemaCache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .build(schemaText -> new Schema.Parser().parse(schemaText)); + } + @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out) throws IOException { final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue(); @@ -89,7 +103,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement if (recordSchema.getSchemaFormat().isPresent() && recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) { final Optional<String> textOption = recordSchema.getSchemaText(); if (textOption.isPresent()) { - avroSchema = compileAvroSchema(textOption.get()); + avroSchema = compiledAvroSchemaCache.get(textOption.get()); } else { avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema); } @@ -110,30 +124,6 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement } } - - private Schema compileAvroSchema(final String text) { - // Access to the LinkedHashMap must be done while synchronized on this. - // However, if no compiled schema exists, we don't want to remain synchronized - // while we compile it, as compilation can be expensive. As a result, if there is - // not a compiled schema already, we will compile it outside of the synchronized - // block, and then re-synchronize to update the map. All of this is functionally - // equivalent to calling compiledAvroSchema.computeIfAbsent(text, t -> new Schema.Parser().parse(t)); - // but does so without synchronizing when not necessary. - Schema compiled; - synchronized (this) { - compiled = compiledAvroSchemaCache.get(text); - } - - if (compiled != null) { - return compiled; - } - - final Schema newlyCompiled = new Schema.Parser().parse(text); - synchronized (this) { - return compiledAvroSchemaCache.computeIfAbsent(text, t -> newlyCompiled); - } - } - private CodecFactory getCodecFactory(String property) { CodecType type = CodecType.valueOf(property); switch (type) { @@ -155,6 +145,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); properties.add(COMPRESSION_FORMAT); + properties.add(CACHE_SIZE); return properties; } http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml index d609faa..42dde93 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/pom.xml @@ -38,6 +38,11 @@ <version>3.8.1</version> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>2.6.2</version> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/765df678/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index cee8d22..9b5a98a 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.attributes; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; @@ -65,7 +67,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -97,12 +98,18 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { private final static Set<Relationship> statelessRelationshipSet; private final static Set<Relationship> statefulRelationshipSet; - private final Map<String, String> canonicalValueLookup = new LinkedHashMap<String, String>() { - @Override - protected boolean removeEldestEntry(final Map.Entry eldest) { - return size() > 100; - } - }; + /** + * This field caches a 'canonical' value for a given attribute value. When this processor is used to update an attribute or add a new + * attribute, if Expression Language is used, we may well end up with a new String object for each attribute for each FlowFile. As a result, + * we will store a different String object for the attribute value of every FlowFile, meaning that we have to keep a lot of String objects + * in heap. By using this 'canonical lookup', we are able to keep only a single String object on the heap. + * + * For example, if we have a property named "abc" and the value is "${abc}${xyz}", and we send through 1,000 FlowFiles with attributes abc="abc" + * and xyz="xyz", then would end up with 1,000 String objects with a value of "abcxyz". By using this canonical representation, we are able to + * instead hold a single String whose value is "abcxyz" instead of holding 1,000 String objects in heap (1,000 String objects may still be created + * when calling PropertyValue.evaluateAttributeExpressions, but this way those values are garbage collected). + */ + private LoadingCache<String, String> canonicalValueLookup; // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -185,6 +192,15 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { .addValidator(Validator.VALID) .build(); + public static final PropertyDescriptor CANONICAL_VALUE_LOOKUP_CACHE_SIZE = new PropertyDescriptor.Builder() + .name("canonical-value-lookup-cache-size") + .displayName("Cache Value Lookup Cache Size") + .description("Specifies how many canonical lookup values should be stored in the cache") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .required(true) + .build(); + private volatile Map<String, Action> defaultActions; private volatile boolean debugEnabled; private volatile boolean stateful = false; @@ -205,6 +221,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { descriptors.add(DELETE_ATTRIBUTES); descriptors.add(STORE_STATE); descriptors.add(STATEFUL_VARIABLES_INIT_VALUE); + descriptors.add(CANONICAL_VALUE_LOOKUP_CACHE_SIZE); return Collections.unmodifiableList(descriptors); } @@ -245,6 +262,11 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { @OnScheduled public void onScheduled(final ProcessContext context) throws IOException { + final int cacheSize = context.getProperty(CANONICAL_VALUE_LOOKUP_CACHE_SIZE).asInteger(); + canonicalValueLookup = Caffeine.newBuilder() + .maximumSize(cacheSize) + .build(attributeValue -> attributeValue); + criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData())); propertyValues.clear(); @@ -594,30 +616,6 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { } } - /** - * This method caches a 'canonical' value for a given attribute value. When this processor is used to update an attribute or add a new - * attribute, if Expression Language is used, we may well end up with a new String object for each attribute for each FlowFile. As a result, - * we will store a different String object for the attribute value of every FlowFile, meaning that we have to keep a lot of String objects - * in heap. By using this 'canonical lookup', we are able to keep only a single String object on the heap. - * - * For example, if we have a property named "abc" and the value is "${abc}${xyz}", and we send through 1,000 FlowFiles with attributes abc="abc" - * and xyz="xyz", then would end up with 1,000 String objects with a value of "abcxyz". By using this canonical representation, we are able to - * instead hold a single String whose value is "abcxyz" instead of holding 1,000 String objects in heap (1,000 String objects may still be created - * when calling PropertyValue.evaluateAttributeExpressions, but this way those values are garbage collected). - * - * @param attributeValue the value whose canonical value should be return - * @return the canonical representation of the given attribute value - */ - private synchronized String getCanonicalRepresentation(final String attributeValue) { - final String canonical = this.canonicalValueLookup.get(attributeValue); - if (canonical != null) { - return canonical; - } - - this.canonicalValueLookup.put(attributeValue, attributeValue); - return attributeValue; - } - // Executes the specified action on the specified flowfile. private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List<Rule> rules, final Map<String, Action> defaultActions, final FlowFile flowfile, final Map<String, String> stateInitialAttributes, final Map<String, String> stateWorkingAttributes) { @@ -688,7 +686,7 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { if (notDeleted || setStatefulAttribute) { try { String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, stateInitialAttributes).getValue(); - newAttributeValue = getCanonicalRepresentation(newAttributeValue); + newAttributeValue = canonicalValueLookup.get(newAttributeValue); // log if appropriate if (debugEnabled) { @@ -746,7 +744,8 @@ public class UpdateAttribute extends AbstractProcessor implements Searchable { final Map<String, Action> defaultActions = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { - if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) { + if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE + && entry.getKey() != CANONICAL_VALUE_LOOKUP_CACHE_SIZE) { final Action action = new Action(); action.setAttribute(entry.getKey().getName()); action.setValue(entry.getValue());
