Repository: nifi Updated Branches: refs/heads/master 4303e4742 -> 5cacc52cf
NIFI-2661 - Abstract the GeoEnrichIP processor NIFI-2661 - Add accuracy radius to GeoEnrichIP NIFI-2661 - Introduce ISPEnrichIP Processor Signed-off-by: Matt Burgess <[email protected]> NIFI-2661 - Addresses PR comments Signed-off-by: Matt Burgess <[email protected]> NIFI-2661 - Addresses PR comments Signed-off-by: Matt Burgess <[email protected]> NIFI-2661: Fixed typos and reversed logging params This closes #1650 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5cacc52c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5cacc52c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5cacc52c Branch: refs/heads/master Commit: 5cacc52cfc70877f262218297fd54bb3ace72ce3 Parents: 4303e47 Author: Andre F de Miranda <[email protected]> Authored: Wed Apr 5 12:05:51 2017 +1000 Committer: Matt Burgess <[email protected]> Committed: Wed Apr 26 09:10:48 2017 -0400 ---------------------------------------------------------------------- .../nifi/processors/AbstractEnrichIP.java | 120 +++++++ .../org/apache/nifi/processors/GeoEnrichIP.java | 113 ++----- .../org/apache/nifi/processors/ISPEnrichIP.java | 142 +++++++++ .../org.apache.nifi.processor.Processor | 1 + .../apache/nifi/processors/TestISPEnrichIP.java | 314 +++++++++++++++++++ 5 files changed, 597 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java new file mode 100644 index 0000000..f55e701 --- /dev/null +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/AbstractEnrichIP.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.maxmind.DatabaseReader; +import org.apache.nifi.util.StopWatch; + +public abstract class AbstractEnrichIP extends AbstractProcessor { + + public static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder() + // Name has been left untouched so that we don't cause a breaking change + // but ideally this should be renamed to MaxMind Database File or something similar + .name("Geo Database File") + .displayName("MaxMind Database File") + .description("Path to Maxmind IP Enrichment Database File") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("IP Address Attribute") + .displayName("IP Address Attribute") + .required(true) + .description("The name of an attribute whose value is a dotted decimal IP address for which enrichment should occur") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) + .build(); + + public static final Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("Where to route flow files after successfully enriching attributes with data provided by database") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("Where to route flow files after unsuccessfully enriching attributes because no data was found") + .build(); + + private Set<Relationship> relationships; + private List<PropertyDescriptor> propertyDescriptors; + final AtomicReference<DatabaseReader> databaseReaderRef = new AtomicReference<>(null); + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue(); + final File dbFile = new File(dbFileString); + final StopWatch stopWatch = new StopWatch(true); + final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build(); + stopWatch.stop(); + getLogger().info("Completed loading of Maxmind Database. Elapsed time was {} milliseconds.", new Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)}); + databaseReaderRef.set(reader); + } + + @OnStopped + public void closeReader() throws IOException { + final DatabaseReader reader = databaseReaderRef.get(); + if (reader != null) { + reader.close(); + } + } + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_FOUND); + rels.add(REL_NOT_FOUND); + this.relationships = Collections.unmodifiableSet(rels); + + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(GEO_DATABASE_FILE); + props.add(IP_ADDRESS_ATTRIBUTE); + this.propertyDescriptors = Collections.unmodifiableList(props); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java index c24799e..175d873 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/GeoEnrichIP.java @@ -16,18 +16,11 @@ */ package org.apache.nifi.processors; -import java.io.File; import java.io.IOException; import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; @@ -39,18 +32,10 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.maxmind.DatabaseReader; import org.apache.nifi.util.StopWatch; @@ -70,6 +55,7 @@ import com.maxmind.geoip2.record.Subdivision; @WritesAttributes({ @WritesAttribute(attribute = "X.geo.lookup.micros", description = "The number of microseconds that the geo lookup took"), @WritesAttribute(attribute = "X.geo.city", description = "The city identified for the IP address"), + @WritesAttribute(attribute = "X.geo.accuracy", description = "The accuracy radius if provided by the database (in Kilometers)"), @WritesAttribute(attribute = "X.geo.latitude", description = "The latitude identified for this IP address"), @WritesAttribute(attribute = "X.geo.longitude", description = "The longitude identified for this IP address"), @WritesAttribute(attribute = "X.geo.subdivision.N", @@ -78,81 +64,7 @@ import com.maxmind.geoip2.record.Subdivision; @WritesAttribute(attribute = "X.geo.country", description = "The country identified for this IP address"), @WritesAttribute(attribute = "X.geo.country.isocode", description = "The ISO Code for the country identified"), @WritesAttribute(attribute = "X.geo.postalcode", description = "The postal code for the country identified"),}) -public class GeoEnrichIP extends AbstractProcessor { - - public static final PropertyDescriptor GEO_DATABASE_FILE = new PropertyDescriptor.Builder() - .name("Geo Database File") - .displayName("Geo Database File") - .description("Path to Maxmind Geo Enrichment Database File") - .required(true) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - - public static final PropertyDescriptor IP_ADDRESS_ATTRIBUTE = new PropertyDescriptor.Builder() - .name("IP Address Attribute") - .displayName("IP Address Attribute") - .required(true) - .description("The name of an attribute whose value is a dotted decimal IP address for which enrichment should occur") - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) - .build(); - - public static final Relationship REL_FOUND = new Relationship.Builder() - .name("found") - .description("Where to route flow files after successfully enriching attributes with geo data") - .build(); - - public static final Relationship REL_NOT_FOUND = new Relationship.Builder() - .name("not found") - .description("Where to route flow files after unsuccessfully enriching attributes because no geo data was found") - .build(); - - private Set<Relationship> relationships; - private List<PropertyDescriptor> propertyDescriptors; - final AtomicReference<DatabaseReader> databaseReaderRef = new AtomicReference<>(null); - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propertyDescriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) throws IOException { - final String dbFileString = context.getProperty(GEO_DATABASE_FILE).getValue(); - final File dbFile = new File(dbFileString); - final StopWatch stopWatch = new StopWatch(true); - final DatabaseReader reader = new DatabaseReader.Builder(dbFile).build(); - stopWatch.stop(); - getLogger().info("Completed loading of Maxmind Geo Database. Elapsed time was {} milliseconds.", new Object[]{stopWatch.getDuration(TimeUnit.MILLISECONDS)}); - databaseReaderRef.set(reader); - } - - @OnStopped - public void closeReader() throws IOException { - final DatabaseReader reader = databaseReaderRef.get(); - if (reader != null) { - reader.close(); - } - } - - @Override - protected void init(final ProcessorInitializationContext context) { - final Set<Relationship> rels = new HashSet<>(); - rels.add(REL_FOUND); - rels.add(REL_NOT_FOUND); - this.relationships = Collections.unmodifiableSet(rels); - - final List<PropertyDescriptor> props = new ArrayList<>(); - props.add(GEO_DATABASE_FILE); - props.add(IP_ADDRESS_ATTRIBUTE); - this.propertyDescriptors = Collections.unmodifiableList(props); - } +public class GeoEnrichIP extends AbstractEnrichIP { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { @@ -164,11 +76,14 @@ public class GeoEnrichIP extends AbstractProcessor { final DatabaseReader dbReader = databaseReaderRef.get(); final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); final String ipAttributeValue = flowFile.getAttribute(ipAttributeName); - if (StringUtils.isEmpty(ipAttributeName)) { //TODO need to add additional validation - should look like an IPv4 or IPv6 addr for instance + + if (StringUtils.isEmpty(ipAttributeName)) { session.transfer(flowFile, REL_NOT_FOUND); - getLogger().warn("Unable to find ip address for {}", new Object[]{flowFile}); + getLogger().warn("FlowFile '{}' attribute '{}' was empty. Routing to failure", + new Object[]{flowFile, IP_ADDRESS_ATTRIBUTE.getDisplayName()}); return; } + InetAddress inetAddress = null; CityResponse response = null; @@ -176,14 +91,21 @@ public class GeoEnrichIP extends AbstractProcessor { inetAddress = InetAddress.getByName(ipAttributeValue); } catch (final IOException ioe) { session.transfer(flowFile, REL_NOT_FOUND); - getLogger().warn("Could not resolve {} to ip address for {}", new Object[]{ipAttributeValue, flowFile}, ioe); + getLogger().warn("Could not resolve the IP for value '{}', contained within the attribute '{}' in " + + "FlowFile '{}'. This is usually caused by issue resolving the appropriate DNS record or " + + "providing the processor with an invalid IP address ", + new Object[]{ipAttributeValue, IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe); return; } + final StopWatch stopWatch = new StopWatch(true); try { response = dbReader.city(inetAddress); stopWatch.stop(); } catch (final IOException | GeoIp2Exception ex) { + // Note IOException is captured again as dbReader also makes InetAddress.getByName() calls. + // Most name or IP resolutions failure should have been triggered in the try loop above but + // environmental conditions may trigger errors during the second resolution as well. session.transfer(flowFile, REL_NOT_FOUND); getLogger().warn("Failure while trying to find enrichment data for {} due to {}", new Object[]{flowFile, ex}, ex); return; @@ -208,6 +130,11 @@ public class GeoEnrichIP extends AbstractProcessor { attrs.put(new StringBuilder(ipAttributeName).append(".geo.longitude").toString(), longitude.toString()); } + final Integer accuracy = response.getLocation().getAccuracyRadius(); + if (accuracy != null) { + attrs.put(new StringBuilder(ipAttributeName).append(".accuracy").toString(), String.valueOf(accuracy)); + } + int i = 0; for (final Subdivision subd : response.getSubdivisions()) { attrs.put(new StringBuilder(ipAttributeName).append(".geo.subdivision.").append(i).toString(), subd.getName()); http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/ISPEnrichIP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/ISPEnrichIP.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/ISPEnrichIP.java new file mode 100644 index 0000000..fc159c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/ISPEnrichIP.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import com.maxmind.geoip2.exception.GeoIp2Exception; +import com.maxmind.geoip2.model.IspResponse; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.maxmind.DatabaseReader; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"ISP", "enrich", "ip", "maxmind"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Looks up ISP information for an IP address and adds the information to FlowFile attributes. The " + + "ISP data is provided as a MaxMind ISP database (Note that this is NOT the same as the GeoLite database utilized" + + "by some geo enrichment tools). The attribute that contains the IP address to lookup is provided by the " + + "'IP Address Attribute' property. If the name of the attribute provided is 'X', then the the attributes added by" + + " enrichment will take the form X.isp.<fieldName>") +@WritesAttributes({ + @WritesAttribute(attribute = "X.isp.lookup.micros", description = "The number of microseconds that the geo lookup took"), + @WritesAttribute(attribute = "X.isp.asn", description = "The Autonomous System Number (ASN) identified for the IP address"), + @WritesAttribute(attribute = "X.isp.asn.organization", description = "The Organization Associated with the ASN identified"), + @WritesAttribute(attribute = "X.isp.name", description = "The name of the ISP associated with the IP address provided"), + @WritesAttribute(attribute = "X.isp.organization", description = "The Organization associated with the IP address provided"),}) +public class ISPEnrichIP extends AbstractEnrichIP { + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final DatabaseReader dbReader = databaseReaderRef.get(); + final String ipAttributeName = context.getProperty(IP_ADDRESS_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); + final String ipAttributeValue = flowFile.getAttribute(ipAttributeName); + + if (StringUtils.isEmpty(ipAttributeName)) { + session.transfer(flowFile, REL_NOT_FOUND); + getLogger().warn("FlowFile '{}' attribute '{}' was empty. Routing to failure", + new Object[]{flowFile, IP_ADDRESS_ATTRIBUTE.getDisplayName()}); + return; + } + + InetAddress inetAddress = null; + IspResponse response = null; + + try { + inetAddress = InetAddress.getByName(ipAttributeValue); + } catch (final IOException ioe) { + session.transfer(flowFile, REL_NOT_FOUND); + getLogger().warn("Could not resolve the IP for value '{}', contained within the attribute '{}' in " + + "FlowFile '{}'. This is usually caused by issue resolving the appropriate DNS record or " + + "providing the processor with an invalid IP address ", + new Object[]{ipAttributeValue, IP_ADDRESS_ATTRIBUTE.getDisplayName(), flowFile}, ioe); + return; + } + final StopWatch stopWatch = new StopWatch(true); + try { + response = dbReader.isp(inetAddress); + stopWatch.stop(); + } catch (final IOException | GeoIp2Exception ex) { + // Note IOException is captured again as dbReader also makes InetAddress.getByName() calls. + // Most name or IP resolutions failure should have been triggered in the try loop above but + // environmental conditions may trigger errors during the second resolution as well. + session.transfer(flowFile, REL_NOT_FOUND); + getLogger().warn("Failure while trying to find enrichment data for {} due to {}", new Object[]{flowFile, ex}, ex); + return; + } + + if (response == null) { + session.transfer(flowFile, REL_NOT_FOUND); + return; + } + + final Map<String, String> attrs = new HashMap<>(); + attrs.put(new StringBuilder(ipAttributeName).append(".isp.lookup.micros").toString(), String.valueOf(stopWatch.getDuration(TimeUnit.MICROSECONDS))); + + + + // During test I observed behavior where null values in ASN data could trigger NPEs. Instead of relying on the + // underlying database to be free from Nulls wrapping ensure equality to null without assigning a variable + // seem like good option to "final int asn ..." as with the other returned data. + if (!(response.getAutonomousSystemNumber() == null)) { + attrs.put(new StringBuilder(ipAttributeName).append(".isp.asn").toString(), String.valueOf(response.getAutonomousSystemNumber())); + } + final String asnOrg = response.getAutonomousSystemOrganization(); + if (asnOrg != null) { + attrs.put(new StringBuilder(ipAttributeName).append(".isp.asn.organization").toString(), asnOrg); + } + + final String ispName = response.getIsp(); + if (ispName != null) { + attrs.put(new StringBuilder(ipAttributeName).append(".isp.name").toString(), ispName); + } + + final String organisation = response.getOrganization(); + if (organisation != null) { + attrs.put(new StringBuilder(ipAttributeName).append(".isp.organization").toString(), organisation); + } + + flowFile = session.putAllAttributes(flowFile, attrs); + + session.transfer(flowFile, REL_FOUND); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 5871892..f9ef597 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,5 +14,6 @@ # limitations under the License. org.apache.nifi.processors.GeoEnrichIP +org.apache.nifi.processors.ISPEnrichIP org.apache.nifi.processors.enrich.QueryDNS org.apache.nifi.processors.enrich.QueryWhois \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/5cacc52c/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestISPEnrichIP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestISPEnrichIP.java b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestISPEnrichIP.java new file mode 100644 index 0000000..d1dcf9c --- /dev/null +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/TestISPEnrichIP.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.maxmind.geoip2.exception.GeoIp2Exception; +import com.maxmind.geoip2.model.IspResponse; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.maxmind.DatabaseReader; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.mockStatic; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ISPEnrichIP.class}) +@SuppressWarnings("WeakerAccess") +public class TestISPEnrichIP { + DatabaseReader databaseReader; + ISPEnrichIP ispEnrichIP; + TestRunner testRunner; + + @Before + public void setUp() throws Exception { + mockStatic(InetAddress.class); + databaseReader = mock(DatabaseReader.class); + ispEnrichIP = new TestableIspEnrichIP(); + testRunner = TestRunners.newTestRunner(ispEnrichIP); + } + + @Test + public void verifyNonExistentIpFlowsToNotFoundRelationship() throws Exception { + testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./"); + testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip"); + + testRunner.enqueue(new byte[0], Collections.emptyMap()); + + testRunner.run(); + + List<MockFlowFile> notFound = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND); + List<MockFlowFile> found = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND); + + assertEquals(1, notFound.size()); + assertEquals(0, found.size()); + + verify(databaseReader).isp(InetAddress.getByName(null)); + } + + @Test + public void successfulMaxMindResponseShouldFlowToFoundRelationship() throws Exception { + testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./"); + testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip"); + + final IspResponse ispResponse = getIspResponse(); + + when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenReturn(ispResponse); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("ip", "1.2.3.4"); + + testRunner.enqueue(new byte[0], attributes); + + testRunner.run(); + + List<MockFlowFile> notFound = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND); + List<MockFlowFile> found = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND); + + assertEquals(0, notFound.size()); + assertEquals(1, found.size()); + + FlowFile finishedFound = found.get(0); + assertNotNull(finishedFound.getAttribute("ip.isp.lookup.micros")); + assertEquals("Apache NiFi - Test ISP", finishedFound.getAttribute("ip.isp.name")); + assertEquals("Apache NiFi - Test Organization", finishedFound.getAttribute("ip.isp.organization")); + assertEquals("1337", finishedFound.getAttribute("ip.isp.asn")); + assertEquals("Apache NiFi - Test Chocolate", finishedFound.getAttribute("ip.isp.asn.organization")); + } + + @Test + public void successfulMaxMindResponseShouldFlowToFoundRelationshipWhenAsnIsNotSet() throws Exception { + testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./"); + testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip"); + + final IspResponse ispResponse = getIspResponseWithoutASNDetail(); + + when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenReturn(ispResponse); + + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("ip", "1.2.3.4"); + + testRunner.enqueue(new byte[0], attributes); + + testRunner.run(); + + List<MockFlowFile> notFound = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND); + List<MockFlowFile> found = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND); + + assertEquals(0, notFound.size()); + assertEquals(1, found.size()); + + FlowFile finishedFound = found.get(0); + assertNotNull(finishedFound.getAttribute("ip.isp.lookup.micros")); + assertNotNull(finishedFound.getAttribute("ip.isp.lookup.micros")); + assertEquals("Apache NiFi - Test ISP", finishedFound.getAttribute("ip.isp.name")); + assertEquals("Apache NiFi - Test Organization", finishedFound.getAttribute("ip.isp.organization")); + assertNull(finishedFound.getAttribute("ip.isp.asn")); + assertNull(finishedFound.getAttribute("ip.isp.asn.organization")); + } + + @Test + public void evaluatingExpressionLanguageShouldAndFindingIpFieldWithSuccessfulLookUpShouldFlowToFoundRelationship() throws Exception { + testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./"); + testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "${ip.fields:substringBefore(',')}"); + + final IspResponse ispResponse = getIspResponse(); + when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenReturn(ispResponse); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("ip.fields", "ip0,ip1,ip2"); + attributes.put("ip0", "1.2.3.4"); + + testRunner.enqueue(new byte[0], attributes); + + testRunner.run(); + + List<MockFlowFile> notFound = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND); + List<MockFlowFile> found = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND); + + assertEquals(0, notFound.size()); + assertEquals(1, found.size()); + + FlowFile finishedFound = found.get(0); + assertNotNull(finishedFound.getAttribute("ip0.isp.lookup.micros")); + assertEquals("Apache NiFi - Test ISP", finishedFound.getAttribute("ip0.isp.name")); + assertEquals("Apache NiFi - Test Organization", finishedFound.getAttribute("ip0.isp.organization")); + assertEquals("1337", finishedFound.getAttribute("ip0.isp.asn")); + assertEquals("Apache NiFi - Test Chocolate", finishedFound.getAttribute("ip0.isp.asn.organization")); + + } + + @Test + public void shouldFlowToNotFoundWhenNullResponseFromMaxMind() throws Exception { + testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./"); + testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip"); + + when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenReturn(null); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("ip", "1.2.3.4"); + + testRunner.enqueue(new byte[0], attributes); + + testRunner.run(); + + List<MockFlowFile> notFound = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND); + List<MockFlowFile> found = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND); + + assertEquals(1, notFound.size()); + assertEquals(0, found.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldFlowToNotFoundWhenIOExceptionThrownFromMaxMind() throws Exception { + testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./"); + testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip"); + + + when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenThrow(IOException.class); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("ip", "1.2.3.4"); + + testRunner.enqueue(new byte[0], attributes); + + testRunner.run(); + + List<MockFlowFile> notFound = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND); + List<MockFlowFile> found = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND); + + assertEquals(1, notFound.size()); + assertEquals(0, found.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldFlowToNotFoundWhenGeoIp2ExceptionThrownFromMaxMind() throws Exception { + testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./"); + testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip"); + + when(databaseReader.isp(InetAddress.getByName("1.2.3.4"))).thenThrow(GeoIp2Exception.class); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("ip", "1.2.3.4"); + + testRunner.enqueue(new byte[0], attributes); + + testRunner.run(); + + List<MockFlowFile> notFound = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND); + List<MockFlowFile> found = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND); + + assertEquals(1, notFound.size()); + assertEquals(0, found.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void whenInetAddressThrowsUnknownHostFlowFileShouldBeSentToNotFound() throws Exception { + testRunner.setProperty(ISPEnrichIP.GEO_DATABASE_FILE, "./"); + testRunner.setProperty(ISPEnrichIP.IP_ADDRESS_ATTRIBUTE, "ip"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("ip", "somenonexistentdomain.comm"); + + when(InetAddress.getByName("somenonexistentdomain.comm")).thenThrow(UnknownHostException.class); + + testRunner.enqueue(new byte[0], attributes); + + testRunner.run(); + + List<MockFlowFile> notFound = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_NOT_FOUND); + List<MockFlowFile> found = testRunner.getFlowFilesForRelationship(ISPEnrichIP.REL_FOUND); + + assertEquals(1, notFound.size()); + assertEquals(0, found.size()); + + verify(databaseReader).close(); + verifyNoMoreInteractions(databaseReader); + } + + private IspResponse getIspResponse() throws Exception { + // Taken from MaxMind unit tests. + final String maxMindIspResponse = "{\n" + + " \"isp\" : \"Apache NiFi - Test ISP\",\n" + + " \"organization\" : \"Apache NiFi - Test Organization\",\n" + + " \"autonomous_system_number\" : 1337,\n" + + " \"autonomous_system_organization\" : \"Apache NiFi - Test Chocolate\" \n" + + " }\n"; + + InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en")); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + + return new ObjectMapper().reader(IspResponse.class).with(inject).readValue(maxMindIspResponse); + + } private IspResponse getIspResponseWithoutASNDetail() throws Exception { + // Taken from MaxMind unit tests. + final String maxMindIspResponse = "{\n" + + " \"isp\" : \"Apache NiFi - Test ISP\",\n" + + " \"organization\" : \"Apache NiFi - Test Organization\",\n" + + " \"autonomous_system_number\" : null " + + " }\n"; + + InjectableValues inject = new InjectableValues.Std().addValue("locales", Collections.singletonList("en")); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + + return new ObjectMapper().reader(IspResponse.class).with(inject).readValue(maxMindIspResponse); + } + + + class TestableIspEnrichIP extends ISPEnrichIP { + @OnScheduled + @Override + public void onScheduled(ProcessContext context) throws IOException { + databaseReaderRef.set(databaseReader); + } + } + + +}
