This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4b691b133b NIFI-10192 Caffeine cache schema request for reuse
4b691b133b is described below
commit 4b691b133b231cdec04290afaf1fe433b6047717
Author: Aerilym <[email protected]>
AuthorDate: Fri Sep 9 12:47:49 2022 +1000
NIFI-10192 Caffeine cache schema request for reuse
This closes #6364
Signed-off-by: Mike Thomsen <[email protected]>
---
.../nifi/processors/standard/LookupRecord.java | 43 +++++++++++++++++++++-
1 file changed, 41 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 7301c26f63..7e4285ad2f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -17,6 +17,8 @@
package org.apache.nifi.processors.standard;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -45,6 +47,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
@@ -194,6 +197,18 @@ public class LookupRecord extends AbstractProcessor {
.required(true)
.build();
+ static final PropertyDescriptor CACHE_SIZE = new
PropertyDescriptor.Builder()
+ .name("record-path-lookup-miss-result-cache-size")
+ .displayName("Cache Size")
+ .description("Specifies how many lookup values/records should be
cached."
+ + "Setting this property to zero means no caching will be done
and the table will be queried for each lookup value in each record. If the
lookup "
+ + "table changes often or the most recent data must be
retrieved, do not use the cache.")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("0")
+ .required(true)
+ .build();
+
static final Relationship REL_MATCHED = new Relationship.Builder()
.name("matched")
.description("All records for which the lookup returns a value will be
routed to this relationship")
@@ -238,6 +253,7 @@ public class LookupRecord extends AbstractProcessor {
properties.add(ROUTING_STRATEGY);
properties.add(RESULT_CONTENTS);
properties.add(REPLACEMENT_STRATEGY);
+ properties.add(CACHE_SIZE);
return properties;
}
@@ -451,7 +467,7 @@ public class LookupRecord extends AbstractProcessor {
if (isInPlaceReplacement) {
return new InPlaceReplacementStrategy();
} else {
- return new RecordPathReplacementStrategy();
+ return new RecordPathReplacementStrategy(context);
}
}
@@ -536,6 +552,19 @@ public class LookupRecord extends AbstractProcessor {
private class RecordPathReplacementStrategy implements ReplacementStrategy
{
private int lookupCount = 0;
+ private volatile Cache<Map<String, Object>, Optional<?>> cache;
+
+ public RecordPathReplacementStrategy(ProcessContext context) {
+
+ final int cacheSize =
context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger();
+
+ if (this.cache == null || cacheSize > 0) {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(cacheSize)
+ .build();
+ }
+ }
+
@Override
public Set<Relationship> lookup(final Record record, final
ProcessContext context, final LookupContext lookupContext) {
lookupCount++;
@@ -548,8 +577,15 @@ public class LookupRecord extends AbstractProcessor {
final FlowFile flowFile = lookupContext.getOriginalFlowFile();
final Optional<?> lookupValueOption;
+ final Optional<?> lookupValueCacheOption;
+
try {
- lookupValueOption = lookupService.lookup(lookupCoordinates,
flowFile.getAttributes());
+ lookupValueCacheOption = (Optional<?>)
cache.get(lookupCoordinates, k -> null);
+ if (lookupValueCacheOption == null) {
+ lookupValueOption =
lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
+ } else {
+ lookupValueOption = lookupValueCacheOption;
+ }
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " +
lookupCoordinates + " in Lookup Service", e);
}
@@ -634,6 +670,9 @@ public class LookupRecord extends AbstractProcessor {
}
final Optional<?> lookupResult =
lookupService.lookup(lookupCoordinates, flowFileAttributes);
+
+ cache.put(lookupCoordinates, lookupResult);
+
if (!lookupResult.isPresent()) {
continue;
}