Repository: nifi
Updated Branches:
  refs/heads/master 71cd497fe -> 7f8987471


NIFI-3946: Update LookupService to take a Map instead of a String for the input
NIFI-3946: Fixed issues where null values were returned instead of empty 
optionals

This closes #1833.

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7f898747
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7f898747
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7f898747

Branch: refs/heads/master
Commit: 7f8987471d18deececd71bed0cbdee28b9c6254f
Parents: 71cd497
Author: Mark Payne <[email protected]>
Authored: Fri May 19 16:42:51 2017 -0400
Committer: Bryan Bende <[email protected]>
Committed: Mon May 22 11:52:41 2017 -0400

----------------------------------------------------------------------
 .../lookup/script/ScriptedLookupService.java    |  14 +-
 .../script/TestScriptedLookupService.groovy     |   6 +-
 .../resources/groovy/test_lookup_inline.groovy  |  11 +-
 .../nifi/processors/standard/LookupRecord.java  | 145 ++++++++++++++-----
 .../processors/standard/TestLookupRecord.java   |  38 ++++-
 .../org/apache/nifi/lookup/LookupService.java   |  23 ++-
 .../apache/nifi/lookup/RecordLookupService.java |  11 +-
 .../apache/nifi/lookup/StringLookupService.java |   9 +-
 .../lookup/SimpleKeyValueLookupService.java     |  23 ++-
 .../nifi/lookup/maxmind/IPLookupService.java    |  85 ++++++-----
 10 files changed, 265 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
index da846ec..ca79ba9 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java
@@ -46,7 +46,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -64,9 +66,14 @@ public class ScriptedLookupService extends 
AbstractScriptedControllerService imp
     private volatile File kerberosServiceKeytab = null;
 
     @Override
-    public Optional<Object> lookup(String key) throws LookupFailureException {
+    public Optional<Object> lookup(Map<String, String> coordinates) throws 
LookupFailureException {
         // Delegate the lookup() call to the scripted LookupService
-        return lookupService.get().lookup(key);
+        return lookupService.get().lookup(coordinates);
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return lookupService.get().getRequiredKeys();
     }
 
     @Override
@@ -177,6 +184,7 @@ public class ScriptedLookupService extends 
AbstractScriptedControllerService imp
         }
     }
 
+    @Override
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
         synchronized (scriptingComponentHelper.isInitialized) {
@@ -236,6 +244,7 @@ public class ScriptedLookupService extends 
AbstractScriptedControllerService imp
         }
     }
 
+    @Override
     public void setup() {
         // Create a single script engine, the Processor object is reused by 
each task
         if (scriptEngine == null) {
@@ -263,6 +272,7 @@ public class ScriptedLookupService extends 
AbstractScriptedControllerService imp
      * @param scriptBody An input stream associated with the script content
      * @return Whether the script was successfully reloaded
      */
+    @Override
     protected boolean reloadScript(final String scriptBody) {
         // note we are starting here with a fresh listing of validation
         // results since we are (re)loading a new/updated script. any

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy
index 439b37d..1dc903f 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy
@@ -92,13 +92,13 @@ class TestScriptedLookupService {
         MockFlowFile mockFlowFile = new MockFlowFile(1L)
         InputStream inStream = new ByteArrayInputStream('Flow file content not 
used'.bytes)
 
-        Optional opt = scriptedLookupService.lookup('Hello')
+        Optional opt = scriptedLookupService.lookup(['key':'Hello'])
         assertTrue(opt.present)
         assertEquals('Hi', opt.get())
-        opt = scriptedLookupService.lookup('World')
+        opt = scriptedLookupService.lookup(['key':'World'])
         assertTrue(opt.present)
         assertEquals('there', opt.get())
-        opt = scriptedLookupService.lookup('Not There')
+        opt = scriptedLookupService.lookup(['key':'Not There'])
         assertFalse(opt.present)
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
index 273ccb9..8285b62 100644
--- 
a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
+++ 
b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+import java.util.Set
+
 import org.apache.nifi.controller.ControllerServiceInitializationContext
 import org.apache.nifi.reporting.InitializationException
 
@@ -28,10 +30,15 @@ class GroovyLookupService implements LookupService<String> {
 
 
     @Override
-    Optional<String> lookup(String key) {
+    Optional<String> lookup(Map<String, String> coordinates) {
+        final String key = coordinates.values().iterator().next();
         Optional.ofNullable(lookupTable[key])
     }
-
+    
+    Set<String> getRequiredKeys() {
+        return java.util.Collections.emptySet();
+    }
+    
     @Override
     Class<?> getValueType() {
         return String

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
index 583ea51..4658c95 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java
@@ -19,26 +19,31 @@ package org.apache.nifi.processors.standard;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.lookup.LookupService;
 import org.apache.nifi.processor.ProcessContext;
@@ -64,18 +69,19 @@ import org.apache.nifi.util.Tuple;
     @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
 })
 @Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", 
"logs", "convert", "filter"})
-@CapabilityDescription("Extracts a field from a Record and looks up its value 
in a LookupService. If a result is returned by the LookupService, "
+@CapabilityDescription("Extracts one or more fields from a Record and looks up 
a value for those fields in a LookupService. If a result is returned by the 
LookupService, "
     + "that result is optionally added to the Record. In this case, the 
processor functions as an Enrichment processor. Regardless, the Record is then "
     + "routed to either the 'matched' relationship or 'unmatched' relationship 
(if the 'Routing Strategy' property is configured to do so), "
-    + "indicating whether or not a result was returned by the LookupService, "
-    + "allowing the processor to also function as a Routing processor. If any 
record in the incoming FlowFile has multiple fields match the configured "
-    + "Lookup RecordPath or if no fields match, then that record will be 
routed to 'unmatched' (or 'success', depending on the configuration of the 
'Routing Strategy' property). "
+    + "indicating whether or not a result was returned by the LookupService, 
allowing the processor to also function as a Routing processor. "
+    + "The \"coordinates\" to use for looking up a value in the Lookup Service 
are defined by adding a user-defined property. Each property that is added will 
have an entry added "
+    + "to a Map, where the name of the property becomes the Map Key and the 
value returned by the RecordPath becomes the value for that key. If multiple 
values are returned by the "
+    + "RecordPath, then the Record will be routed to the 'unmatched' 
relationship (or 'success', depending on the 'Routing Strategy' property's 
configuration). "
     + "If one or more fields match the Result RecordPath, all fields "
     + "that match will be updated. If there is no match in the configured 
LookupService, then no fields will be updated. I.e., it will not overwrite an 
existing value in the Record "
     + "with a null value. Please note, however, that if the results returned 
by the LookupService are not accounted for in your schema (specifically, "
     + "the schema that is configured for your Record Writer) then the fields 
will not be written out to the FlowFile.")
 @SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = 
{"org.apache.nifi.lookup.SimpleKeyValueLookupService", 
"org.apache.nifi.lookup.maxmind.IPLookupService"})
-public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, 
RecordPath>> {
+public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, 
RecordPath>, RecordPath>> {
 
     private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
     private volatile LookupService<?> lookupService;
@@ -94,15 +100,6 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
         .required(true)
         .build();
 
-    static final PropertyDescriptor LOOKUP_RECORD_PATH = new 
PropertyDescriptor.Builder()
-        .name("lookup-record-path")
-        .displayName("Lookup RecordPath")
-        .description("A RecordPath that points to the field whose value will 
be looked up in the configured Lookup Service")
-        .addValidator(new RecordPathValidator())
-        .expressionLanguageSupported(true)
-        .required(true)
-        .build();
-
     static final PropertyDescriptor RESULT_RECORD_PATH = new 
PropertyDescriptor.Builder()
         .name("result-record-path")
         .displayName("Result RecordPath")
@@ -159,13 +156,64 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.addAll(super.getSupportedPropertyDescriptors());
         properties.add(LOOKUP_SERVICE);
-        properties.add(LOOKUP_RECORD_PATH);
         properties.add(RESULT_RECORD_PATH);
         properties.add(ROUTING_STRATEGY);
         return properties;
     }
 
     @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .description("A RecordPath that points to the field whose value 
will be looked up in the configured Lookup Service")
+            .addValidator(new RecordPathValidator())
+            .expressionLanguageSupported(true)
+            .required(false)
+            .dynamic(true)
+            .build();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final Set<String> dynamicPropNames = 
validationContext.getProperties().keySet().stream()
+            .filter(prop -> prop.isDynamic())
+            .map(prop -> prop.getName())
+            .collect(Collectors.toSet());
+
+        if (dynamicPropNames.isEmpty()) {
+            return Collections.singleton(new ValidationResult.Builder()
+                .subject("User-Defined Properties")
+                .valid(false)
+                .explanation("At least one user-defined property must be 
specified.")
+                .build());
+        }
+
+        final Set<String> requiredKeys = 
validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys();
+        final Set<String> missingKeys = requiredKeys.stream()
+            .filter(key -> !dynamicPropNames.contains(key))
+            .collect(Collectors.toSet());
+
+        if (!missingKeys.isEmpty()) {
+            final List<ValidationResult> validationResults = new ArrayList<>();
+            for (final String missingKey : missingKeys) {
+                final ValidationResult result = new ValidationResult.Builder()
+                    .subject(missingKey)
+                    .valid(false)
+                    .explanation("The configured Lookup Services requires that 
a key be provided with the name '" + missingKey
+                        + "'. Please add a new property to this Processor with 
a name '" + missingKey
+                        + "' and provide a RecordPath that can be used to 
retrieve the appropriate value.")
+                    .build();
+                validationResults.add(result);
+            }
+
+            return validationResults;
+        }
+
+        return Collections.emptyList();
+    }
+
+    @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (ROUTING_STRATEGY.equals(descriptor)) {
             if 
(ROUTE_TO_MATCHED_UNMATCHED.getValue().equalsIgnoreCase(newValue)) {
@@ -189,33 +237,43 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
 
     @Override
     protected Set<Relationship> route(final Record record, final RecordSchema 
writeSchema, final FlowFile flowFile, final ProcessContext context,
-        final Tuple<RecordPath, RecordPath> flowFileContext) {
+        final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
 
-        final RecordPathResult lookupPathResult = 
flowFileContext.getKey().evaluate(record);
-        final List<FieldValue> lookupFieldValues = 
lookupPathResult.getSelectedFields()
-            .filter(fieldVal -> fieldVal.getValue() != null)
-            .collect(Collectors.toList());
+        final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
+        final Map<String, String> lookupCoordinates = new 
HashMap<>(recordPaths.size());
 
-        if (lookupFieldValues.isEmpty()) {
-            final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-            getLogger().debug("Lookup RecordPath did not match any fields in a 
record for {}; routing record to " + rels, new Object[] {flowFile});
-            return rels;
-        }
+        for (final Map.Entry<String, RecordPath> entry : 
recordPaths.entrySet()) {
+            final String coordinateKey = entry.getKey();
+            final RecordPath recordPath = entry.getValue();
 
-        if (lookupFieldValues.size() > 1) {
-            final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
-            getLogger().debug("Lookup RecordPath matched {} fields in a record 
for {}; routing record to " + rels, new Object[] {lookupFieldValues.size(), 
flowFile});
-            return rels;
-        }
+            final RecordPathResult pathResult = recordPath.evaluate(record);
+            final List<FieldValue> lookupFieldValues = 
pathResult.getSelectedFields()
+                .filter(fieldVal -> fieldVal.getValue() != null)
+                .collect(Collectors.toList());
 
-        final FieldValue fieldValue = lookupFieldValues.get(0);
-        final String lookupKey = DataTypeUtils.toString(fieldValue.getValue(), 
(String) null);
+            if (lookupFieldValues.isEmpty()) {
+                final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+                getLogger().debug("RecordPath for property '{}' did not match 
any fields in a record for {}; routing record to {}", new Object[] 
{coordinateKey, flowFile, rels});
+                return rels;
+            }
+
+            if (lookupFieldValues.size() > 1) {
+                final Set<Relationship> rels = routeToMatchedUnmatched ? 
UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
+                getLogger().debug("RecordPath for property '{}' matched {} 
fields in a record for {}; routing record to {}",
+                    new Object[] {coordinateKey, lookupFieldValues.size(), 
flowFile, rels});
+                return rels;
+            }
+
+            final FieldValue fieldValue = lookupFieldValues.get(0);
+            final String coordinateValue = 
DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+            lookupCoordinates.put(coordinateKey, coordinateValue);
+        }
 
         final Optional<?> lookupValue;
         try {
-            lookupValue = lookupService.lookup(lookupKey);
+            lookupValue = lookupService.lookup(lookupCoordinates);
         } catch (final Exception e) {
-            throw new ProcessException("Failed to lookup value '" + lookupKey 
+ "' in Lookup Service", e);
+            throw new ProcessException("Failed to lookup coordinates " + 
lookupCoordinates + " in Lookup Service", e);
         }
 
         if (!lookupValue.isPresent()) {
@@ -243,9 +301,17 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
     }
 
     @Override
-    protected Tuple<RecordPath, RecordPath> getFlowFileContext(final FlowFile 
flowFile, final ProcessContext context) {
-        final String lookupPathText = 
context.getProperty(LOOKUP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
-        final RecordPath lookupRecordPath = 
recordPathCache.getCompiled(lookupPathText);
+    protected Tuple<Map<String, RecordPath>, RecordPath> 
getFlowFileContext(final FlowFile flowFile, final ProcessContext context) {
+        final Map<String, RecordPath> recordPaths = new HashMap<>();
+        for (final PropertyDescriptor prop : context.getProperties().keySet()) 
{
+            if (!prop.isDynamic()) {
+                continue;
+            }
+
+            final String pathText = 
context.getProperty(prop).evaluateAttributeExpressions(flowFile).getValue();
+            final RecordPath lookupRecordPath = 
recordPathCache.getCompiled(pathText);
+            recordPaths.put(prop.getName(), lookupRecordPath);
+        }
 
         final RecordPath resultRecordPath;
         if (context.getProperty(RESULT_RECORD_PATH).isSet()) {
@@ -255,6 +321,7 @@ public class LookupRecord extends 
AbstractRouteRecord<Tuple<RecordPath, RecordPa
             resultRecordPath = null;
         }
 
-        return new Tuple<>(lookupRecordPath, resultRecordPath);
+        return new Tuple<>(recordPaths, resultRecordPath);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
index d19ee43..b84f518 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java
@@ -17,10 +17,13 @@
 
 package org.apache.nifi.processors.standard;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.lookup.StringLookupService;
 import org.apache.nifi.reporting.InitializationException;
@@ -57,7 +60,7 @@ public class TestLookupRecord {
         runner.setProperty(LookupRecord.RECORD_READER, "reader");
         runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
         runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
-        runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/name");
+        runner.setProperty("lookup", "/name");
         runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
         runner.setProperty(LookupRecord.ROUTING_STRATEGY, 
LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
 
@@ -145,7 +148,7 @@ public class TestLookupRecord {
 
     @Test
     public void testLookupPathNotFound() throws InitializationException {
-        runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/other");
+        runner.setProperty("lookup", "/other");
 
         runner.enqueue("");
         runner.run();
@@ -197,7 +200,7 @@ public class TestLookupRecord {
         lookupService.addValue("Jane Doe", "Basketball");
         lookupService.addValue("Jimmy Doe", "Football");
 
-        runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/*");
+        runner.setProperty("lookup", "/*");
 
         runner.enqueue("");
         runner.run();
@@ -210,6 +213,19 @@ public class TestLookupRecord {
         out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
     }
 
+    @Test
+    public void testInvalidUnlessAllRequiredPropertiesAdded() throws 
InitializationException {
+        runner.removeProperty(new 
PropertyDescriptor.Builder().name("lookup").build());
+        runner.setProperty("hello", "/name");
+        runner.assertNotValid();
+
+        runner.setProperty("lookup", "xx");
+        runner.assertNotValid();
+
+        runner.setProperty("lookup", "/name");
+        runner.assertValid();
+    }
+
 
 
     private static class MapLookup extends AbstractControllerService 
implements StringLookupService {
@@ -225,9 +241,23 @@ public class TestLookupRecord {
         }
 
         @Override
-        public Optional<String> lookup(final String key) {
+        public Optional<String> lookup(final Map<String, String> coordinates) {
+            if (coordinates == null) {
+                return Optional.empty();
+            }
+
+            final String key = coordinates.get("lookup");
+            if (key == null) {
+                return Optional.empty();
+            }
+
             return Optional.ofNullable(values.get(key));
         }
+
+        @Override
+        public Set<String> getRequiredKeys() {
+            return Collections.singleton("lookup");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
index 00258b6..48ec173 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java
@@ -17,25 +17,36 @@
 
 package org.apache.nifi.lookup;
 
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import org.apache.nifi.controller.ControllerService;
 
 public interface LookupService<T> extends ControllerService {
 
     /**
-     * Looks up a value that corresponds to the given key
+     * Looks up a value that corresponds to the given map of information, 
referred to as lookup coordinates
      *
-     * @param key the key to lookup
-     * @return a value that corresponds to the given key
+     * @param coordinates a Map of key/value pairs that indicate the 
information that should be looked up
+     * @return a value that corresponds to the given coordinates
      *
-     * @throws LookupFailureException if unable to lookup a value for the 
given key
+     * @throws LookupFailureException if unable to lookup a value for the 
given coordinates
      */
-    Optional<T> lookup(String key) throws LookupFailureException;
+    Optional<T> lookup(Map<String, String> coordinates) throws 
LookupFailureException;
 
     /**
-     * @return the Class that represents the type of value that will be 
returned by {@link #lookup(String)}
+     * @return the Class that represents the type of value that will be 
returned by {@link #lookup(Map)}
      */
     Class<?> getValueType();
 
+    /**
+     * Many Lookup Services will require a specific set of information be 
passed in to the {@link #lookup(Map)} method.
+     * This method will return the Set of keys that must be present in the map 
that is passed to {@link #lookup(Map)} in order
+     * for the lookup to succeed.
+     *
+     * @return the keys that must be present in the map passed to {@link 
#lookup(Map)} in order to the lookup to succeed, or an empty set
+     *         if no specific keys are required.
+     */
+    Set<String> getRequiredKeys();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
index fee2884..aefb880 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.lookup;
 
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.nifi.serialization.record.Record;
@@ -24,15 +25,15 @@ import org.apache.nifi.serialization.record.Record;
 public interface RecordLookupService extends LookupService<Record> {
 
     /**
-     * Returns an Optional Record that corresponds to the given key
+     * Returns an Optional Record that corresponds to the given coordinates
      *
-     * @param key the key to lookup
-     * @return an Optional Record that corresponds to the given key
+     * @param coordinates the coordinates to lookup
+     * @return an Optional Record that corresponds to the given coordinates
      *
-     * @throws LookupFailureException if unable to lookup a value for the 
given key
+     * @throws LookupFailureException if unable to lookup a value for the 
given coordinates
      */
     @Override
-    Optional<Record> lookup(String key) throws LookupFailureException;
+    Optional<Record> lookup(Map<String, String> coordinates) throws 
LookupFailureException;
 
     @Override
     default Class<?> getValueType() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
index be7d7c8..aa2721bd 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
@@ -17,18 +17,19 @@
 
 package org.apache.nifi.lookup;
 
+import java.util.Map;
 import java.util.Optional;
 
 public interface StringLookupService extends LookupService<String> {
 
     /**
-     * Returns an Optional value that corresponds to the given key
+     * Returns an Optional value that corresponds to the given coordinates
      *
-     * @param key the key to lookup
-     * @return an Optional String that represents the value for the given key
+     * @param coordinates the coordinates to lookup
+     * @return an Optional String that represents the value for the given 
coordinates
      */
     @Override
-    Optional<String> lookup(String key);
+    Optional<String> lookup(Map<String, String> coordinates);
 
     @Override
     default Class<?> getValueType() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
index 59f7ca6..4ed75b2 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
@@ -20,7 +20,9 @@ package org.apache.nifi.lookup;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -31,8 +33,11 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 
 @Tags({"lookup", "enrich", "key", "value"})
-@CapabilityDescription("Allows users to add key/value pairs as User-defined 
Properties. Each property that is added can be looked up by Property Name.")
+@CapabilityDescription("Allows users to add key/value pairs as User-defined 
Properties. Each property that is added can be looked up by Property Name. "
+    + "The coordinates that are passed to the lookup must contain the key 
'key'.")
 public class SimpleKeyValueLookupService extends AbstractControllerService 
implements StringLookupService {
+    private static final String KEY = "key";
+    private static final Set<String> REQUIRED_KEYS = 
Stream.of(KEY).collect(Collectors.toSet());
     private volatile Map<String, String> lookupValues = new HashMap<>();
 
     @Override
@@ -52,7 +57,21 @@ public class SimpleKeyValueLookupService extends 
AbstractControllerService imple
     }
 
     @Override
-    public Optional<String> lookup(final String key) {
+    public Optional<String> lookup(final Map<String, String> coordinates) {
+        if (coordinates == null) {
+            return Optional.empty();
+        }
+
+        final String key = coordinates.get(KEY);
+        if (key == null) {
+            return Optional.empty();
+        }
+
         return Optional.ofNullable(lookupValues.get(key));
     }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7f898747/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
index 58ee4de..096f1a6 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
@@ -17,16 +17,23 @@
 
 package org.apache.nifi.lookup.maxmind;
 
-import com.maxmind.db.InvalidDatabaseException;
-import com.maxmind.geoip2.model.AnonymousIpResponse;
-import com.maxmind.geoip2.model.CityResponse;
-import com.maxmind.geoip2.model.ConnectionTypeResponse;
-import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
-import com.maxmind.geoip2.model.DomainResponse;
-import com.maxmind.geoip2.model.IspResponse;
-import com.maxmind.geoip2.record.Country;
-import com.maxmind.geoip2.record.Location;
-import com.maxmind.geoip2.record.Subdivision;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -42,28 +49,30 @@ import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.StopWatch;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import com.maxmind.db.InvalidDatabaseException;
+import com.maxmind.geoip2.model.AnonymousIpResponse;
+import com.maxmind.geoip2.model.CityResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
+import com.maxmind.geoip2.model.DomainResponse;
+import com.maxmind.geoip2.model.IspResponse;
+import com.maxmind.geoip2.record.Country;
+import com.maxmind.geoip2.record.Location;
+import com.maxmind.geoip2.record.Subdivision;
+
 
 @Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", 
"cellular", "anonymous", "tor"})
 @CapabilityDescription("A lookup service that provides several types of 
enrichment information for IP addresses. The service is configured by providing 
a MaxMind "
-    + "Database file and specifying which types of enrichment should be 
provided for an IP Address. Each type of enrichment is a separate lookup, so 
configuring the "
-    + "service to provide all of the available enrichment data may be slower 
than returning only a portion of the available enrichments. View the Usage of 
this component "
+    + "Database file and specifying which types of enrichment should be 
provided for an IP Address or Hostname. Each type of enrichment is a separate 
lookup, so configuring the "
+    + "service to provide all of the available enrichment data may be slower 
than returning only a portion of the available enrichments. In order to use 
this service, a lookup "
+    + "must be performed using key of 'ip' and a value that is a valid IP 
address or hostname. View the Usage of this component "
     + "and choose to view Additional Details for more information, such as the 
Schema that pertains to the information that is returned.")
 public class IPLookupService extends AbstractControllerService implements 
RecordLookupService {
 
     private volatile String databaseFile = null;
+    private static final String IP_KEY = "ip";
+    private static final Set<String> REQUIRED_KEYS = 
Stream.of(IP_KEY).collect(Collectors.toSet());
+
     private volatile DatabaseReader databaseReader = null;
     private volatile String databaseChecksum = null;
     private volatile long databaseLastRefreshAttempt = -1;
@@ -175,8 +184,13 @@ public class IPLookupService extends 
AbstractControllerService implements Record
     }
 
     @Override
-    public Optional<Record> lookup(final String key) throws 
LookupFailureException {
-        if (key == null) {
+    public Set<String> getRequiredKeys() {
+        return REQUIRED_KEYS;
+    }
+
+    @Override
+    public Optional<Record> lookup(final Map<String, String> coordinates) 
throws LookupFailureException {
+        if (coordinates == null) {
             return Optional.empty();
         }
 
@@ -193,7 +207,7 @@ public class IPLookupService extends 
AbstractControllerService implements Record
         // InvalidDatabaseException, so force a reload and then retry the 
lookup one time, if we still get an error then throw it
         try {
             final DatabaseReader databaseReader = this.databaseReader;
-            return doLookup(databaseReader, key);
+            return doLookup(databaseReader, coordinates);
         } catch (InvalidDatabaseException idbe) {
             if (dbWriteLock.tryLock()) {
                 try {
@@ -210,7 +224,7 @@ public class IPLookupService extends 
AbstractControllerService implements Record
                     getLogger().debug("Attempting to retry lookup after 
InvalidDatabaseException");
                     try {
                         final DatabaseReader databaseReader = 
this.databaseReader;
-                        return doLookup(databaseReader, key);
+                        return doLookup(databaseReader, coordinates);
                     } catch (final Exception e) {
                         throw new LookupFailureException("Error performing 
look up: " + e.getMessage(), e);
                     }
@@ -218,18 +232,23 @@ public class IPLookupService extends 
AbstractControllerService implements Record
                     dbWriteLock.unlock();
                 }
             } else {
-                throw new LookupFailureException("Failed to lookup the key " + 
key + " due to " + idbe.getMessage(), idbe);
+                throw new LookupFailureException("Failed to lookup a value for 
" + coordinates + " due to " + idbe.getMessage(), idbe);
             }
         }
     }
 
-    private Optional<Record> doLookup(final DatabaseReader databaseReader, 
final String key) throws LookupFailureException, InvalidDatabaseException {
+    private Optional<Record> doLookup(final DatabaseReader databaseReader, 
final Map<String, String> coordinates) throws LookupFailureException, 
InvalidDatabaseException {
+        final String ipAddress = coordinates.get(IP_KEY);
+        if (ipAddress == null) {
+            return Optional.empty();
+        }
+
         final InetAddress inetAddress;
         try {
-            inetAddress = InetAddress.getByName(key);
+            inetAddress = InetAddress.getByName(ipAddress);
         } catch (final IOException ioe) {
             getLogger().warn("Could not resolve the IP for value '{}'. This is 
usually caused by issue resolving the appropriate DNS record or " +
-                    "providing the service with an invalid IP address", new 
Object[] {key}, ioe);
+                "providing the service with an invalid IP address", new 
Object[] {coordinates}, ioe);
 
             return Optional.empty();
         }

Reply via email to