Repository: nifi
Updated Branches:
  refs/heads/master 4303e4742 -> 5cacc52cf


NIFI-2661 - Abstract the GeoEnrichIP processor NIFI-2661 - Add accuracy radius 
to GeoEnrichIP NIFI-2661 - Introduce ISPEnrichIP Processor

Signed-off-by: Matt Burgess <[email protected]>

NIFI-2661 - Addresses PR comments

Signed-off-by: Matt Burgess <[email protected]>

NIFI-2661 - Addresses PR comments

Signed-off-by: Matt Burgess <[email protected]>

NIFI-2661: Fixed typos and reversed logging params

This closes #1650


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5cacc52c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5cacc52c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5cacc52c

Branch: refs/heads/master
Commit: 5cacc52cfc70877f262218297fd54bb3ace72ce3
Parents: 4303e47
Author: Andre F de Miranda <[email protected]>
Authored: Wed Apr 5 12:05:51 2017 +1000
Committer: Matt Burgess <[email protected]>
Committed: Wed Apr 26 09:10:48 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/AbstractEnrichIP.java       | 120 +++++++
 .../org/apache/nifi/processors/GeoEnrichIP.java | 113 ++-----
 .../org/apache/nifi/processors/ISPEnrichIP.java | 142 +++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../apache/nifi/processors/TestISPEnrichIP.java | 314 +++++++++++++++++++
 5 files changed, 597 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
new file mode 100644
index 0000000..f55e701
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.maxmind.DatabaseReader;
+import org.apache.nifi.util.StopWatch;
+
+public abstract class AbstractEnrichIP extends AbstractProcessor {
+
+    public static final PropertyDescriptor GEO_DATABASE_FILE = new 
PropertyDescriptor.Builder()
+            // Name has been left untouched so that we don't cause a breaking 
change
+            // but ideally this should be renamed to MaxMind Database File or 
something similar
+            .name("Geo Database File")
+            .displayName("MaxMind Database File")
+            .description("Path to Maxmind IP Enrichment Database File")
+            .required(true)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("IP Address Attribute")
+            .displayName("IP Address Attribute")
+            .required(true)
+            .description("The name of an attribute whose value is a dotted 
decimal IP address for which enrichment should occur")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
+            .build();
+
+    public static final Relationship REL_FOUND = new Relationship.Builder()
+            .name("found")
+            .description("Where to route flow files after successfully 
enriching attributes with data provided by database")
+            .build();
+
+    public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
+            .name("not found")
+            .description("Where to route flow files after unsuccessfully 
enriching attributes because no data was found")
+            .build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> propertyDescriptors;
+    final AtomicReference<DatabaseReader> databaseReaderRef = new 
AtomicReference<>(null);
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        final String dbFileString = 
context.getProperty(GEO_DATABASE_FILE).getValue();
+        final File dbFile = new File(dbFileString);
+        final StopWatch stopWatch = new StopWatch(true);
+        final DatabaseReader reader = new 
DatabaseReader.Builder(dbFile).build();
+        stopWatch.stop();
+        getLogger().info("Completed loading of Maxmind Database.  Elapsed time 
was {} milliseconds.", new 
Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)});
+        databaseReaderRef.set(reader);
+    }
+
+    @OnStopped
+    public void closeReader() throws IOException {
+        final DatabaseReader reader = databaseReaderRef.get();
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_FOUND);
+        rels.add(REL_NOT_FOUND);
+        this.relationships = Collections.unmodifiableSet(rels);
+
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(GEO_DATABASE_FILE);
+        props.add(IP_ADDRESS_ATTRIBUTE);
+        this.propertyDescriptors = Collections.unmodifiableList(props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
index c24799e..175d873 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java
@@ -16,18 +16,11 @@
  */
 package org.apache.nifi.processors;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -39,18 +32,10 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.maxmind.DatabaseReader;
 import org.apache.nifi.util.StopWatch;
 
@@ -70,6 +55,7 @@ import com.maxmind.geoip2.record.Subdivision;
 @WritesAttributes({
     @WritesAttribute(attribute = "X.geo.lookup.micros", description = "The 
number of microseconds that the geo lookup took"),
     @WritesAttribute(attribute = "X.geo.city", description = "The city 
identified for the IP address"),
+    @WritesAttribute(attribute = "X.geo.accuracy", description = "The accuracy 
radius if provided by the database (in Kilometers)"),
     @WritesAttribute(attribute = "X.geo.latitude", description = "The latitude 
identified for this IP address"),
     @WritesAttribute(attribute = "X.geo.longitude", description = "The 
longitude identified for this IP address"),
     @WritesAttribute(attribute = "X.geo.subdivision.N",
@@ -78,81 +64,7 @@ import com.maxmind.geoip2.record.Subdivision;
     @WritesAttribute(attribute = "X.geo.country", description = "The country 
identified for this IP address"),
     @WritesAttribute(attribute = "X.geo.country.isocode", description = "The 
ISO Code for the country identified"),
     @WritesAttribute(attribute = "X.geo.postalcode", description = "The postal 
code for the country identified"),})
-public class GeoEnrichIP extends AbstractProcessor {
-
-    public static final PropertyDescriptor GEO_DATABASE_FILE = new 
PropertyDescriptor.Builder()
-            .name("Geo Database File")
-            .displayName("Geo Database File")
-            .description("Path to Maxmind Geo Enrichment Database File")
-            .required(true)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new 
PropertyDescriptor.Builder()
-            .name("IP Address Attribute")
-            .displayName("IP Address Attribute")
-            .required(true)
-            .description("The name of an attribute whose value is a dotted 
decimal IP address for which enrichment should occur")
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
-            .build();
-
-    public static final Relationship REL_FOUND = new Relationship.Builder()
-            .name("found")
-            .description("Where to route flow files after successfully 
enriching attributes with geo data")
-            .build();
-
-    public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
-            .name("not found")
-            .description("Where to route flow files after unsuccessfully 
enriching attributes because no geo data was found")
-            .build();
-
-    private Set<Relationship> relationships;
-    private List<PropertyDescriptor> propertyDescriptors;
-    final AtomicReference<DatabaseReader> databaseReaderRef = new 
AtomicReference<>(null);
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propertyDescriptors;
-    }
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException {
-        final String dbFileString = 
context.getProperty(GEO_DATABASE_FILE).getValue();
-        final File dbFile = new File(dbFileString);
-        final StopWatch stopWatch = new StopWatch(true);
-        final DatabaseReader reader = new 
DatabaseReader.Builder(dbFile).build();
-        stopWatch.stop();
-        getLogger().info("Completed loading of Maxmind Geo Database.  Elapsed 
time was {} milliseconds.", new 
Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)});
-        databaseReaderRef.set(reader);
-    }
-
-    @OnStopped
-    public void closeReader() throws IOException {
-        final DatabaseReader reader = databaseReaderRef.get();
-        if (reader != null) {
-            reader.close();
-        }
-    }
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final Set<Relationship> rels = new HashSet<>();
-        rels.add(REL_FOUND);
-        rels.add(REL_NOT_FOUND);
-        this.relationships = Collections.unmodifiableSet(rels);
-
-        final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(GEO_DATABASE_FILE);
-        props.add(IP_ADDRESS_ATTRIBUTE);
-        this.propertyDescriptors = Collections.unmodifiableList(props);
-    }
+public class GeoEnrichIP extends AbstractEnrichIP {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
@@ -164,11 +76,14 @@ public class GeoEnrichIP extends AbstractProcessor {
         final DatabaseReader dbReader = databaseReaderRef.get();
         final String ipAttributeName = 
context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
         final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
-        if (StringUtils.isEmpty(ipAttributeName)) { //TODO need to add 
additional validation - should look like an IPv4 or IPv6 addr for instance
+
+        if (StringUtils.isEmpty(ipAttributeName)) {
             session.transfer(flowFile, REL_NOT_FOUND);
-            getLogger().warn("Unable to find ip address for {}", new 
Object[]{flowFile});
+            getLogger().warn("FlowFile '{}' attribute '{}' was empty. Routing 
to failure",
+                    new Object[]{flowFile, 
IP_ADDRESS_ATTRIBUTE.getDisplayName()});
             return;
         }
+
         InetAddress inetAddress = null;
         CityResponse response = null;
 
@@ -176,14 +91,21 @@ public class GeoEnrichIP extends AbstractProcessor {
             inetAddress = InetAddress.getByName(ipAttributeValue);
         } catch (final IOException ioe) {
             session.transfer(flowFile, REL_NOT_FOUND);
-            getLogger().warn("Could not resolve {} to ip address for {}", new 
Object[]{ipAttributeValue, flowFile}, ioe);
+            getLogger().warn("Could not resolve the IP for value '{}', 
contained within the attribute '{}' in " +
+                            "FlowFile '{}'. This is usually caused by issue 
resolving the appropriate DNS record or " +
+                            "providing the processor with an invalid IP 
address ",
+                            new Object[]{ipAttributeValue, 
IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe);
             return;
         }
+
         final StopWatch stopWatch = new StopWatch(true);
         try {
             response = dbReader.city(inetAddress);
             stopWatch.stop();
         } catch (final IOException | GeoIp2Exception ex) {
+            // Note IOException is captured again as dbReader also makes 
InetAddress.getByName() calls.
+            // Most name or IP resolutions failure should have been triggered 
in the try loop above but
+            // environmental conditions may trigger errors during the second 
resolution as well.
             session.transfer(flowFile, REL_NOT_FOUND);
             getLogger().warn("Failure while trying to find enrichment data for 
{} due to {}", new Object[]{flowFile, ex}, ex);
             return;
@@ -208,6 +130,11 @@ public class GeoEnrichIP extends AbstractProcessor {
             attrs.put(new 
StringBuilder(ipAttributeName).append(".geo.longitude").toString(), 
longitude.toString());
         }
 
+        final Integer accuracy = response.getLocation().getAccuracyRadius();
+        if (accuracy != null) {
+            attrs.put(new 
StringBuilder(ipAttributeName).append(".accuracy").toString(), 
String.valueOf(accuracy));
+        }
+
         int i = 0;
         for (final Subdivision subd : response.getSubdivisions()) {
             attrs.put(new 
StringBuilder(ipAttributeName).append(".geo.subdivision.").append(i).toString(),
 subd.getName());

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/ISPEnrichIP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/ISPEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/ISPEnrichIP.java
new file mode 100644
index 0000000..fc159c9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/ISPEnrichIP.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import com.maxmind.geoip2.exception.GeoIp2Exception;
+import com.maxmind.geoip2.model.IspResponse;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.maxmind.DatabaseReader;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"ISP", "enrich", "ip", "maxmind"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Looks up ISP information for an IP address and adds 
the information to FlowFile attributes. The "
+        + "ISP data is provided as a MaxMind ISP database (Note that this is 
NOT the same as the GeoLite database utilized" +
+        "by some geo enrichment tools). The attribute that contains the IP 
address to lookup is provided by the " +
+        "'IP Address Attribute' property. If the name of the attribute 
provided is 'X', then the the attributes added by" +
+        " enrichment will take the form X.isp.<fieldName>")
+@WritesAttributes({
+    @WritesAttribute(attribute = "X.isp.lookup.micros", description = "The 
number of microseconds that the geo lookup took"),
+    @WritesAttribute(attribute = "X.isp.asn", description = "The Autonomous 
System Number (ASN) identified for the IP address"),
+    @WritesAttribute(attribute = "X.isp.asn.organization", description = "The 
Organization Associated with the ASN identified"),
+    @WritesAttribute(attribute = "X.isp.name", description = "The name of the 
ISP associated with the IP address provided"),
+    @WritesAttribute(attribute = "X.isp.organization", description = "The 
Organization associated with the IP address provided"),})
+public class ISPEnrichIP extends AbstractEnrichIP {
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final DatabaseReader dbReader = databaseReaderRef.get();
+        final String ipAttributeName = 
context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
+        final String ipAttributeValue = flowFile.getAttribute(ipAttributeName);
+
+        if (StringUtils.isEmpty(ipAttributeName)) {
+            session.transfer(flowFile, REL_NOT_FOUND);
+            getLogger().warn("FlowFile '{}' attribute '{}' was empty. Routing 
to failure",
+                    new Object[]{flowFile, 
IP_ADDRESS_ATTRIBUTE.getDisplayName()});
+            return;
+        }
+
+        InetAddress inetAddress = null;
+        IspResponse response = null;
+
+        try {
+            inetAddress = InetAddress.getByName(ipAttributeValue);
+        } catch (final IOException ioe) {
+            session.transfer(flowFile, REL_NOT_FOUND);
+            getLogger().warn("Could not resolve the IP for value '{}', 
contained within the attribute '{}' in " +
+                            "FlowFile '{}'. This is usually caused by issue 
resolving the appropriate DNS record or " +
+                            "providing the processor with an invalid IP 
address ",
+                    new Object[]{ipAttributeValue, 
IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe);
+            return;
+        }
+        final StopWatch stopWatch = new StopWatch(true);
+        try {
+            response = dbReader.isp(inetAddress);
+            stopWatch.stop();
+        } catch (final IOException | GeoIp2Exception ex) {
+            // Note IOException is captured again as dbReader also makes 
InetAddress.getByName() calls.
+            // Most name or IP resolutions failure should have been triggered 
in the try loop above but
+            // environmental conditions may trigger errors during the second 
resolution as well.
+            session.transfer(flowFile, REL_NOT_FOUND);
+            getLogger().warn("Failure while trying to find enrichment data for 
{} due to {}", new Object[]{flowFile, ex}, ex);
+            return;
+        }
+
+        if (response == null) {
+            session.transfer(flowFile, REL_NOT_FOUND);
+            return;
+        }
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put(new 
StringBuilder(ipAttributeName).append(".isp.lookup.micros").toString(), 
String.valueOf(stopWatch.getDuration(TimeUnit.MICROSECONDS)));
+
+
+
+        // During test I observed behavior where null values in ASN data could 
trigger NPEs. Instead of relying on the
+        // underlying database to be free from Nulls wrapping ensure equality 
to null without assigning a variable
+        // seem like good option to "final int asn ..." as with the other 
returned data.
+        if (!(response.getAutonomousSystemNumber() == null)) {
+            attrs.put(new 
StringBuilder(ipAttributeName).append(".isp.asn").toString(), 
String.valueOf(response.getAutonomousSystemNumber()));
+        }
+        final String asnOrg = response.getAutonomousSystemOrganization();
+        if (asnOrg != null) {
+            attrs.put(new 
StringBuilder(ipAttributeName).append(".isp.asn.organization").toString(), 
asnOrg);
+        }
+
+        final String ispName = response.getIsp();
+        if (ispName != null) {
+            attrs.put(new 
StringBuilder(ipAttributeName).append(".isp.name").toString(), ispName);
+        }
+
+        final String organisation = response.getOrganization();
+        if (organisation  != null) {
+            attrs.put(new 
StringBuilder(ipAttributeName).append(".isp.organization").toString(), 
organisation);
+        }
+
+        flowFile = session.putAllAttributes(flowFile, attrs);
+
+        session.transfer(flowFile, REL_FOUND);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 5871892..f9ef597 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,5 +14,6 @@
 # limitations under the License.
 
 org.apache.nifi.processors.GeoEnrichIP
+org.apache.nifi.processors.ISPEnrichIP
 org.apache.nifi.processors.enrich.QueryDNS
 org.apache.nifi.processors.enrich.QueryWhois
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestISPEnrichIP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestISPEnrichIP.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestISPEnrichIP.java
new file mode 100644
index 0000000..d1dcf9c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestISPEnrichIP.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.maxmind.geoip2.exception.GeoIp2Exception;
+import com.maxmind.geoip2.model.IspResponse;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.maxmind.DatabaseReader;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ISPEnrichIP.class})
+@SuppressWarnings("WeakerAccess")
+public class TestISPEnrichIP {
+    DatabaseReader databaseReader;
+    ISPEnrichIP ispEnrichIP;
+    TestRunner testRunner;
+
+    @Before
+    public void setUp() throws Exception {
+        mockStatic(InetAddress.class);
+        databaseReader = mock(DatabaseReader.class);
+        ispEnrichIP = new TestableIspEnrichIP();
+        testRunner = TestRunners.newTestRunner(ispEnrichIP);
+    }
+
+    @Test
+    public void verifyNonExistentIpFlowsToNotFoundRelationship() throws 
Exception {
+        testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./");
+        testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip");
+
+        testRunner.enqueue(new byte[0], Collections.emptyMap());
+
+        testRunner.run();
+
+        List<MockFlowFile> notFound = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND);
+        List<MockFlowFile> found = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND);
+
+        assertEquals(1, notFound.size());
+        assertEquals(0, found.size());
+
+        verify(databaseReader).isp(InetAddress.getByName(null));
+    }
+
+    @Test
+    public void successfulMaxMindResponseShouldFlowToFoundRelationship() 
throws Exception {
+        testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./");
+        testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip");
+
+        final IspResponse ispResponse = getIspResponse();
+
+        
when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenReturn(ispResponse);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ip", "1.2.3.4");
+
+        testRunner.enqueue(new byte[0], attributes);
+
+        testRunner.run();
+
+        List<MockFlowFile> notFound = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND);
+        List<MockFlowFile> found = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND);
+
+        assertEquals(0, notFound.size());
+        assertEquals(1, found.size());
+
+        FlowFile finishedFound = found.get(0);
+        assertNotNull(finishedFound.getAttribute("ip.isp.lookup.micros"));
+        assertEquals("Apache NiFi - Test ISP", 
finishedFound.getAttribute("ip.isp.name"));
+        assertEquals("Apache NiFi - Test Organization", 
finishedFound.getAttribute("ip.isp.organization"));
+        assertEquals("1337", finishedFound.getAttribute("ip.isp.asn"));
+        assertEquals("Apache NiFi - Test Chocolate", 
finishedFound.getAttribute("ip.isp.asn.organization"));
+    }
+
+    @Test
+    public void 
successfulMaxMindResponseShouldFlowToFoundRelationshipWhenAsnIsNotSet() throws 
Exception {
+        testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./");
+        testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip");
+
+        final IspResponse ispResponse = getIspResponseWithoutASNDetail();
+
+        
when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenReturn(ispResponse);
+
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ip", "1.2.3.4");
+
+        testRunner.enqueue(new byte[0], attributes);
+
+        testRunner.run();
+
+        List<MockFlowFile> notFound = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND);
+        List<MockFlowFile> found = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND);
+
+        assertEquals(0, notFound.size());
+        assertEquals(1, found.size());
+
+        FlowFile finishedFound = found.get(0);
+        assertNotNull(finishedFound.getAttribute("ip.isp.lookup.micros"));
+        assertNotNull(finishedFound.getAttribute("ip.isp.lookup.micros"));
+        assertEquals("Apache NiFi - Test ISP", 
finishedFound.getAttribute("ip.isp.name"));
+        assertEquals("Apache NiFi - Test Organization", 
finishedFound.getAttribute("ip.isp.organization"));
+        assertNull(finishedFound.getAttribute("ip.isp.asn"));
+        assertNull(finishedFound.getAttribute("ip.isp.asn.organization"));
+    }
+
+    @Test
+    public void 
evaluatingExpressionLanguageShouldAndFindingIpFieldWithSuccessfulLookUpShouldFlowToFoundRelationship()
 throws Exception {
+        testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./");
+        testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, 
"${ip.fields:substringBefore(',')}");
+
+        final IspResponse ispResponse = getIspResponse();
+        
when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenReturn(ispResponse);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ip.fields", "ip0,ip1,ip2");
+        attributes.put("ip0", "1.2.3.4");
+
+        testRunner.enqueue(new byte[0], attributes);
+
+        testRunner.run();
+
+        List<MockFlowFile> notFound = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND);
+        List<MockFlowFile> found = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND);
+
+        assertEquals(0, notFound.size());
+        assertEquals(1, found.size());
+
+        FlowFile finishedFound = found.get(0);
+        assertNotNull(finishedFound.getAttribute("ip0.isp.lookup.micros"));
+        assertEquals("Apache NiFi - Test ISP", 
finishedFound.getAttribute("ip0.isp.name"));
+        assertEquals("Apache NiFi - Test Organization", 
finishedFound.getAttribute("ip0.isp.organization"));
+        assertEquals("1337", finishedFound.getAttribute("ip0.isp.asn"));
+        assertEquals("Apache NiFi - Test Chocolate", 
finishedFound.getAttribute("ip0.isp.asn.organization"));
+
+    }
+
+    @Test
+    public void shouldFlowToNotFoundWhenNullResponseFromMaxMind() throws 
Exception {
+        testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./");
+        testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip");
+
+        
when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenReturn(null);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ip", "1.2.3.4");
+
+        testRunner.enqueue(new byte[0], attributes);
+
+        testRunner.run();
+
+        List<MockFlowFile> notFound = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND);
+        List<MockFlowFile> found = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND);
+
+        assertEquals(1, notFound.size());
+        assertEquals(0, found.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFlowToNotFoundWhenIOExceptionThrownFromMaxMind() throws 
Exception {
+        testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./");
+        testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip");
+
+
+        
when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenThrow(IOException.class);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ip", "1.2.3.4");
+
+        testRunner.enqueue(new byte[0], attributes);
+
+        testRunner.run();
+
+        List<MockFlowFile> notFound = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND);
+        List<MockFlowFile> found = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND);
+
+        assertEquals(1, notFound.size());
+        assertEquals(0, found.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldFlowToNotFoundWhenGeoIp2ExceptionThrownFromMaxMind() 
throws Exception {
+        testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./");
+        testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip");
+
+        
when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenThrow(GeoIp2Exception.class);
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ip", "1.2.3.4");
+
+        testRunner.enqueue(new byte[0], attributes);
+
+        testRunner.run();
+
+        List<MockFlowFile> notFound = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND);
+        List<MockFlowFile> found = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND);
+
+        assertEquals(1, notFound.size());
+        assertEquals(0, found.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
whenInetAddressThrowsUnknownHostFlowFileShouldBeSentToNotFound() throws 
Exception {
+        testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./");
+        testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("ip", "somenonexistentdomain.comm");
+
+        
when(InetAddress.getByName("somenonexistentdomain.comm")).thenThrow(UnknownHostException.class);
+
+        testRunner.enqueue(new byte[0], attributes);
+
+        testRunner.run();
+
+        List<MockFlowFile> notFound = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND);
+        List<MockFlowFile> found = 
testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND);
+
+        assertEquals(1, notFound.size());
+        assertEquals(0, found.size());
+
+        verify(databaseReader).close();
+        verifyNoMoreInteractions(databaseReader);
+    }
+
+    private IspResponse getIspResponse() throws Exception {
+        // Taken from MaxMind unit tests.
+        final String maxMindIspResponse = "{\n" +
+            "         \"isp\" : \"Apache NiFi - Test ISP\",\n" +
+            "         \"organization\" : \"Apache NiFi - Test 
Organization\",\n" +
+            "         \"autonomous_system_number\" : 1337,\n" +
+            "         \"autonomous_system_organization\" : \"Apache NiFi - 
Test Chocolate\" \n" +
+            "      }\n";
+
+        InjectableValues inject = new 
InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+
+        return new 
ObjectMapper().reader(IspResponse.class).with(inject).readValue(maxMindIspResponse);
+
+    }    private IspResponse getIspResponseWithoutASNDetail() throws Exception 
{
+        // Taken from MaxMind unit tests.
+        final String maxMindIspResponse = "{\n" +
+            "         \"isp\" : \"Apache NiFi - Test ISP\",\n" +
+            "         \"organization\" : \"Apache NiFi - Test 
Organization\",\n" +
+            "         \"autonomous_system_number\" : null " +
+            "      }\n";
+
+        InjectableValues inject = new 
InjectableValues.Std().addValue("locales", Collections.singletonList("en"));
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+
+        return new 
ObjectMapper().reader(IspResponse.class).with(inject).readValue(maxMindIspResponse);
+    }
+
+
+    class TestableIspEnrichIP extends ISPEnrichIP {
+        @OnScheduled
+        @Override
+        public void onScheduled(ProcessContext context) throws IOException {
+            databaseReaderRef.set(databaseReader);
+        }
+    }
+
+
+}

Reply via email to