NIFI-1316 adding option to DetectDuplicate to not cache the entry identifier
Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6b54753d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6b54753d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6b54753d Branch: refs/heads/NIFI-259 Commit: 6b54753dbb9bf6b2694a0cee7ac485fdcc8c3d01 Parents: 0c68e2c Author: jpercivall <[email protected]> Authored: Mon Dec 21 16:05:13 2015 -0500 Committer: Aldrin Piri <[email protected]> Committed: Mon Jan 11 22:59:24 2016 -0500 ---------------------------------------------------------------------- .../processors/standard/DetectDuplicate.java | 33 ++++++++- .../standard/TestDetectDuplicate.java | 78 +++++++++++++++++++- 2 files changed, 104 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6b54753d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java index 39dc725..71195d9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DetectDuplicate.java @@ -56,7 +56,7 @@ import org.apache.nifi.processor.util.StandardValidators; @Tags({"hash", "dupe", "duplicate", "dedupe"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. " - + "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's" + + "If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's " + "\"description\", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor " + "routes the FlowFile to 'non-duplicate'") @WritesAttribute(attribute = "original.flowfile.description", description = "All FlowFiles routed to the duplicate relationship will have " @@ -101,6 +101,16 @@ public class DetectDuplicate extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); + public static final PropertyDescriptor CACHE_IDENTIFIER = new PropertyDescriptor.Builder() + .name("Cache The Entry Identifier") + .description("When true this cause the processor to check for duplicates and cache the Entry Identifier. When false, " + + "the processor would only check for duplicates and not cache the Entry Identifier, requiring another " + + "processor to add identifiers to the distributed cache.") + .required(false) + .allowableValues("true","false") + .defaultValue("true") + .build(); + public static final Relationship REL_DUPLICATE = new Relationship.Builder() .name("duplicate") .description("If a FlowFile has been detected to be a duplicate, it will be routed to this relationship") @@ -134,6 +144,7 @@ public class DetectDuplicate extends AbstractProcessor { descriptors.add(FLOWFILE_DESCRIPTION); descriptors.add(AGE_OFF_DURATION); descriptors.add(DISTRIBUTED_CACHE_SERVICE); + descriptors.add(CACHE_IDENTIFIER); return descriptors; } @@ -164,14 +175,28 @@ public class DetectDuplicate extends AbstractProcessor { try { final String flowFileDescription = context.getProperty(FLOWFILE_DESCRIPTION).evaluateAttributeExpressions(flowFile).getValue(); final CacheValue cacheValue = new CacheValue(flowFileDescription, now); - final CacheValue originalCacheValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer); + final CacheValue originalCacheValue; + + final boolean shouldCacheIdentifier = context.getProperty(CACHE_IDENTIFIER).asBoolean(); + if (shouldCacheIdentifier) { + originalCacheValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer); + } else { + originalCacheValue = cache.get(cacheKey, keySerializer, valueDeserializer); + } + boolean duplicate = originalCacheValue != null; if (duplicate && durationMS != null && (now >= originalCacheValue.getEntryTimeMS() + durationMS)) { boolean status = cache.remove(cacheKey, keySerializer); logger.debug("Removal of expired cached entry with key {} returned {}", new Object[]{cacheKey, status}); - // this should typically result in duplicate being false...but, better safe than sorry - duplicate = !cache.putIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer); + + // both should typically result in duplicate being false...but, better safe than sorry + if (shouldCacheIdentifier) { + duplicate = !cache.putIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer); + } else { + duplicate = cache.containsKey(cacheKey, keySerializer); + } } + if (duplicate) { session.getProvenanceReporter().route(flowFile, REL_DUPLICATE, "Duplicate of: " + ORIGINAL_DESCRIPTION_ATTRIBUTE_NAME); String originalFlowFileDescription = originalCacheValue.getDescription(); http://git-wip-us.apache.org/repos/asf/nifi/blob/6b54753d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index 12a5cd4..54e6a29 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -73,7 +73,6 @@ public class TestDetectDuplicate { runner.run(); runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); runner.clearTransferState(); - client.exists = true; runner.enqueue(new byte[]{}, props); runner.run(); runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1); @@ -101,7 +100,6 @@ public class TestDetectDuplicate { runner.run(); runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); runner.clearTransferState(); - client.exists = true; Thread.sleep(3000); runner.enqueue(new byte[]{}, props); runner.run(); @@ -120,6 +118,72 @@ public class TestDetectDuplicate { return client; } + @Test + public void testDuplicateNoCache() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); + final DistributedMapCacheClientImpl client = createClient(); + final Map<String, String> clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + runner.addControllerService("client", client, clientProperties); + runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); + runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); + runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "48 hours"); + runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "false"); + final Map<String, String> props = new HashMap<>(); + props.put("hash.value", "1000"); + runner.enqueue(new byte[]{}, props); + runner.enableControllerService(client); + + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.clearTransferState(); + + runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "true"); + runner.enqueue(new byte[]{}, props); + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0); + runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); + runner.clearTransferState(); + + runner.enqueue(new byte[]{}, props); + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_DUPLICATE, 1); + runner.assertTransferCount(DetectDuplicate.REL_NON_DUPLICATE, 0); + runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); + } + + @Test + public void testDuplicateNoCacheWithAgeOff() throws InitializationException, InterruptedException { + + final TestRunner runner = TestRunners.newTestRunner(DetectDuplicate.class); + final DistributedMapCacheClientImpl client = createClient(); + final Map<String, String> clientProperties = new HashMap<>(); + clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); + runner.addControllerService("client", client, clientProperties); + runner.setProperty(DetectDuplicate.DISTRIBUTED_CACHE_SERVICE, "client"); + runner.setProperty(DetectDuplicate.FLOWFILE_DESCRIPTION, "The original flow file"); + runner.setProperty(DetectDuplicate.AGE_OFF_DURATION, "2 secs"); + runner.enableControllerService(client); + + final Map<String, String> props = new HashMap<>(); + props.put("hash.value", "1000"); + runner.enqueue(new byte[]{}, props); + + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + + runner.clearTransferState(); + Thread.sleep(3000); + + runner.setProperty(DetectDuplicate.CACHE_IDENTIFIER, "false"); + runner.enqueue(new byte[]{}, props); + runner.run(); + runner.assertAllFlowFilesTransferred(DetectDuplicate.REL_NON_DUPLICATE, 1); + runner.assertTransferCount(DetectDuplicate.REL_DUPLICATE, 0); + runner.assertTransferCount(DetectDuplicate.REL_FAILURE, 0); + } + static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { boolean exists = false; @@ -150,6 +214,7 @@ public class TestDetectDuplicate { } cacheValue = value; + exists = true; return true; } @@ -160,6 +225,7 @@ public class TestDetectDuplicate { return (V) cacheValue; } cacheValue = value; + exists = true; return null; } @@ -170,7 +236,11 @@ public class TestDetectDuplicate { @Override public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { - return null; + if (exists) { + return (V) cacheValue; + } else { + return null; + } } @Override @@ -181,6 +251,8 @@ public class TestDetectDuplicate { @Override public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { + cacheValue = value; + exists = true; } }
