NIFI-1965 - Implement QueryDNS processor

This closes #496.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d6a2409d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d6a2409d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d6a2409d

Branch: refs/heads/master
Commit: d6a2409d71f52b691d4202792419dfea48a422aa
Parents: 4939199
Author: Andre F de Miranda <[email protected]>
Authored: Sun Jun 5 01:41:16 2016 +1000
Committer: Pierre Villard <[email protected]>
Committed: Mon Aug 8 22:27:40 2016 +0200

----------------------------------------------------------------------
 .../nifi-enrich-processors/pom.xml              |  13 +
 .../enrich/AbstractEnrichProcessor.java         | 161 +++++++++++
 .../apache/nifi/processors/enrich/QueryDNS.java | 276 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   3 +-
 .../enrich/FakeDNSInitialDirContextFactory.java |  52 ++++
 .../nifi/processors/enrich/TestQueryDNS.java    | 227 +++++++++++++++
 6 files changed, 731 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d6a2409d/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
index d91914b..7059a2f 100644
--- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml
@@ -20,6 +20,8 @@
         <artifactId>nifi-enrich-bundle</artifactId>
         <version>1.0.0-SNAPSHOT</version>
     </parent>
+
+
     <artifactId>nifi-enrich-processors</artifactId>
     <dependencies>
         <dependency>
@@ -50,5 +52,16 @@
             <artifactId>findbugs-annotations</artifactId>
             <version>1.3.9-1</version>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d6a2409d/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
new file mode 100644
index 0000000..c576ca2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/AbstractEnrichProcessor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class AbstractEnrichProcessor extends AbstractProcessor {
+    public static final PropertyDescriptor QUERY_INPUT = new 
PropertyDescriptor.Builder()
+            .name("QUERY_INPUT")
+            .displayName("Lookup value")
+            .required(true)
+            .description("The value that should be used to populate the query")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final AllowableValue SPLIT= new AllowableValue("Split", 
"Split",
+            "Use a delimiter character or RegEx  to split the results into 
attributes");
+    public static final AllowableValue REGEX = new AllowableValue("RegEx", 
"RegEx",
+            "Use a regular expression to split the results into attributes ");
+    public static final AllowableValue NONE = new AllowableValue("None", 
"None",
+            "Do not split results");
+
+    public static final PropertyDescriptor QUERY_PARSER = new 
PropertyDescriptor.Builder()
+            .name("QUERY_PARSER")
+            .displayName("Results Parser")
+            .description("The method used to slice the results into attribute 
groups")
+            .allowableValues(SPLIT, REGEX, NONE)
+            .required(true)
+            .defaultValue(NONE.getValue())
+            .build();
+
+    public static final PropertyDescriptor QUERY_PARSER_INPUT = new 
PropertyDescriptor.Builder()
+            .name("QUERY_PARSER_INPUT")
+            .displayName("Parser RegEx")
+            .description("Choice between a splitter and regex matcher used to 
parse the results of the query into attribute groups")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    public static final Relationship REL_FOUND = new Relationship.Builder()
+            .name("found")
+            .description("Where to route flow files after successfully 
enriching attributes with data")
+            .build();
+
+    public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
+            .name("not found")
+            .description("Where to route flow files if data enrichment query 
rendered no results")
+            .build();
+
+
+    @Override
+    public List<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final String chosenQUERY_PARSER = 
validationContext.getProperty(QUERY_PARSER).getValue();
+        final boolean QUERY_PARSER_INPUT_isSet = 
validationContext.getProperty(QUERY_PARSER_INPUT).isSet();
+
+        if ((!chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( 
!QUERY_PARSER_INPUT_isSet )) {
+            results.add(new 
ValidationResult.Builder().input("QUERY_PARSER_INPUT")
+                    .explanation("Split and Regex parsers require a valid 
Regular Expression")
+                    .valid(false)
+                    .build());
+        }
+
+        if ((chosenQUERY_PARSER.equals(NONE.getValue()) ) && ( 
QUERY_PARSER_INPUT_isSet )) {
+            results.add(new 
ValidationResult.Builder().input("QUERY_PARSER_INPUT")
+                    .explanation("NONE parser does not support the use of 
Regular Expressions")
+                    .valid(false)
+                    .build());
+        }
+
+        return results;
+    }
+
+
+
+    /**
+     * This method returns the parsed record string in the form of
+     * a map of two strings, consisting of a iteration aware attribute
+     * names and its values
+     *
+     * @param  recordPosition  the iteration counter for the record
+     * @param  rawResult the raw query results to be parsed
+     * @param queryParser The parsing mechanism being used to parse the data 
into groups
+     * @param queryRegex The regex to be used to split the query results into 
groups
+     * @return  Map with attribute names and values
+     */
+    protected Map<String, String> parseResponse(int recordPosition, String 
rawResult, String queryParser, String queryRegex, String schema) {
+
+        Map<String, String> results = new HashMap<>();
+        Pattern p;
+
+
+        // Iterates  over the results using the QUERY_REGEX adding the 
captured groups
+        // as it progresses
+
+        switch (queryParser) {
+            case "Split":
+                // Time to Split the results...
+                String[] splitResult = rawResult.split(queryRegex);
+                for (int r = 0; r < splitResult.length; r++) {
+                    results.put("enrich." + schema + ".record" + 
String.valueOf(recordPosition) + ".group" + String.valueOf(r), splitResult[r]);
+                }
+                break;
+
+            case "RegEx":
+                // RegEx was chosen, iterating...
+                p = Pattern.compile(queryRegex);
+                Matcher finalResult = p.matcher(rawResult);
+                if (finalResult.find()) {
+                    // Note that RegEx matches capture group 0 is usually 
broad but starting with it anyway
+                    // for the sake of purity
+                    for (int r = 0; r < finalResult.groupCount(); r++) {
+                        results.put("enrich." + schema + ".record" + 
String.valueOf(recordPosition) + ".group" + String.valueOf(r), 
finalResult.group(r));
+                    }
+                }
+                break;
+
+            case "None":
+                // Fails to NONE
+            default:
+                // NONE was chosen, just appending the record result as group0 
without further splitting
+                results.put("enrich." + schema + ".record" + 
String.valueOf(recordPosition) + ".group0", rawResult);
+                break;
+        }
+        return results;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d6a2409d/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
new file mode 100644
index 0000000..d80a72c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryDNS.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.naming.Context;
+import javax.naming.NameNotFoundException;
+import javax.naming.NamingEnumeration;
+import javax.naming.NamingException;
+import javax.naming.directory.Attributes;
+import javax.naming.directory.BasicAttributes;
+import javax.naming.directory.DirContext;
+import javax.naming.directory.InitialDirContext;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"dns", "enrich", "ip"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("A powerful DNS query processor primary designed to 
enrich DataFlows with DNS based APIs " +
+        "(e.g. RBLs, ShadowServer's ASN lookup) but that can be also used to 
perform regular DNS lookups.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "enrich.dns.record*.group*", description 
= "The captured fields of the DNS query response for each of the records 
received"),
+})
+public class QueryDNS extends AbstractEnrichProcessor {
+
+    public static final PropertyDescriptor DNS_QUERY_TYPE = new 
PropertyDescriptor.Builder()
+            .name("DNS_QUERY_TYPE")
+            .displayName("DNS Query Type")
+            .description("The DNS query type to be used by the processor (e.g. 
TXT, A)")
+            .required(true)
+            .defaultValue("TXT")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DNS_SERVER = new 
PropertyDescriptor.Builder()
+            .name("DNS_SERVER")
+            .displayName("DNS Servers")
+            .description("A comma separated list of  DNS servers to be used. 
(Defaults to system wide if none is used)")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DNS_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("DNS_TIMEOUT")
+            .displayName("DNS Query Timeout")
+            .description("The amount of milliseconds to wait until considering 
a query as failed")
+            .required(true)
+            .defaultValue("1500")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DNS_RETRIES = new 
PropertyDescriptor.Builder()
+            .name("DNS_RETRIES")
+            .displayName("DNS Query Retries")
+            .description("The number of attempts before giving up and moving 
on")
+            .required(true)
+            .defaultValue("1")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+
+    private final static List<PropertyDescriptor> propertyDescriptors;
+    private final static Set<Relationship> relationships;
+
+    private DirContext ictx;
+
+    // Assign the default and generally used contextFactory value
+    private String contextFactory = 
com.sun.jndi.dns.DnsContextFactory.class.getName();;
+
+    static {
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(QUERY_INPUT);
+        props.add(QUERY_PARSER);
+        props.add(QUERY_PARSER_INPUT);
+        props.add(DNS_RETRIES);
+        props.add(DNS_TIMEOUT);
+        props.add(DNS_SERVER);
+        props.add(DNS_QUERY_TYPE);
+        propertyDescriptors = Collections.unmodifiableList(props);
+
+        Set<Relationship> rels = new HashSet<>();
+        rels.add(REL_FOUND);
+        rels.add(REL_NOT_FOUND);
+        relationships = Collections.unmodifiableSet(rels);
+    }
+    private AtomicBoolean initialized = new AtomicBoolean(false);
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        if (!initialized.get()) {
+            initializeResolver(context);
+            getLogger().warn("Resolver was initialized at onTrigger instead of 
onScheduled");
+
+        }
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String queryType = 
context.getProperty(DNS_QUERY_TYPE).getValue();
+        final String queryInput = 
context.getProperty(QUERY_INPUT).evaluateAttributeExpressions(flowFile).getValue();
+        final String queryParser = 
context.getProperty(QUERY_PARSER).getValue();
+        final String queryRegex = 
context.getProperty(QUERY_PARSER_INPUT).getValue();
+
+        boolean found = false;
+        try {
+            Attributes results = doLookup(queryInput, queryType);
+            // NOERROR & NODATA seem to return empty Attributes handled bellow
+            // but defaulting to not found in any case
+            if (results.size() < 1) {
+                found = false;
+            } else {
+                int recordNumber = 0;
+                NamingEnumeration<?> dnsEntryIterator = 
results.get(queryType).getAll();
+
+                while (dnsEntryIterator.hasMoreElements()) {
+                    String dnsRecord = dnsEntryIterator.next().toString();
+                    // While NXDOMAIN is being generated by doLookup catch
+
+                    if (dnsRecord != "NXDOMAIN") {
+                        Map<String, String> parsedResults = 
parseResponse(recordNumber, dnsRecord, queryParser, queryRegex, "dns");
+                        flowFile = session.putAllAttributes(flowFile, 
parsedResults);
+                        found = true;
+                    } else {
+                        // Otherwise treat as not found
+                        found = false;
+                    }
+
+                    // Increase the counter and iterate over next record....
+                    recordNumber++;
+                }
+            }
+        } catch (NamingException e) {
+            context.yield();
+            throw new ProcessException("Unexpected NamingException while 
processing records. Please review your configuration.", e);
+
+        }
+
+        // Finally prepare to send the data down the pipeline
+        if (found) {
+            // Sending the resulting flowfile (with attributes) to REL_FOUND
+            session.transfer(flowFile, REL_FOUND);
+        } else {
+            // NXDOMAIN received, accepting the fate but forwarding
+            // to REL_NOT_FOUND
+            session.transfer(flowFile, REL_NOT_FOUND);
+        }
+    }
+
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        try {
+            initializeResolver(context);
+        } catch (Exception e) {
+            context.yield();
+            throw new ProcessException("Failed to initialize the JNDI DNS 
resolver server", e);
+        }
+    }
+
+
+    protected void initializeResolver(final ProcessContext context ) {
+
+        final String dnsTimeout = context.getProperty(DNS_TIMEOUT).getValue();
+        final String dnsServer = context.getProperty(DNS_SERVER).getValue();
+        final String dnsRetries = context.getProperty(DNS_RETRIES).getValue();
+
+
+        String finalServer = "";
+        Hashtable<String,String> env = new Hashtable<String,String>();
+        env.put("java.naming.factory.initial", contextFactory);
+        env.put("com.sun.jndi.dns.timeout.initial", dnsTimeout);
+        env.put("com.sun.jndi.dns.timeout.retries", dnsRetries);
+        if (StringUtils.isNotEmpty(dnsServer)) {
+            for (String server : dnsServer.split(",")) {
+                finalServer = finalServer + "dns://" + server + "/. ";
+            }
+            env.put(Context.PROVIDER_URL, finalServer);
+        }
+
+        try {
+            initializeContext(env);
+            initialized.set(true);
+        } catch (NamingException e) {
+            getLogger().error("Could not initialize JNDI context", e);
+        }
+    }
+
+
+    /**
+     * This method performs a simple DNS lookup using JNDI
+     * @param queryInput String containing the query body itself (e.g. 
4.3.3.1.in-addr.arpa);
+     * @param queryType String containign the query type (e.g. TXT);
+     */
+    protected Attributes doLookup(String queryInput, String queryType) throws 
NamingException {
+        // This is a simple DNS lookup attempt
+
+        Attributes attrs;
+
+        try {
+            // Uses pre-existing context to resolve
+            attrs = ictx.getAttributes(queryInput, new String[]{queryType});
+            return attrs;
+        } catch ( NameNotFoundException e) {
+            getLogger().debug("Resolution for domain {} failed due to {}", new 
Object[]{queryInput, e});
+            attrs = new BasicAttributes(queryType, "NXDOMAIN",true);
+            return attrs;
+        }
+    }
+
+    // This was separated from main code to ease the creation of test units 
injecting fake JNDI data
+    // back into the processor.
+    protected void initializeContext(Hashtable<String,String> env) throws 
NamingException {
+        this.ictx = new InitialDirContext(env);
+        this.initialized =  new AtomicBoolean(false);
+        initialized.set(true);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/d6a2409d/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9b1be71..e4a39f5 100644
--- 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.nifi.processors.GeoEnrichIP
\ No newline at end of file
+org.apache.nifi.processors.GeoEnrichIP
+org.apache.nifi.processors.enrich.QueryDNS

http://git-wip-us.apache.org/repos/asf/nifi/blob/d6a2409d/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/FakeDNSInitialDirContextFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/FakeDNSInitialDirContextFactory.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/FakeDNSInitialDirContextFactory.java
new file mode 100644
index 0000000..a00dab6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/FakeDNSInitialDirContextFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+import org.mockito.Mockito;
+
+import javax.naming.Context;
+import javax.naming.NamingException;
+import javax.naming.directory.DirContext;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
+
+
+public class FakeDNSInitialDirContextFactory implements InitialContextFactory {
+    private static DirContext mockContext = null;
+
+    public static DirContext getLatestMockContext() {
+        if (mockContext == null) {
+            return CreateMockContext();
+        } else {
+            return mockContext;
+        }
+    }
+
+    @Override
+    public Context getInitialContext(Hashtable environment) throws 
NamingException {
+        return CreateMockContext();
+    }
+
+    private static DirContext CreateMockContext() {
+        synchronized (FakeDNSInitialDirContextFactory.class) {
+            mockContext = (DirContext) Mockito.mock(DirContext.class);
+        }
+        return mockContext;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/d6a2409d/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
new file mode 100644
index 0000000..65c1a50
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/test/java/org/apache/nifi/processors/enrich/TestQueryDNS.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+import javax.naming.Context;
+import javax.naming.directory.Attributes;
+import javax.naming.directory.BasicAttribute;
+import javax.naming.directory.BasicAttributes;
+import javax.naming.directory.DirContext;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestQueryDNS  {
+    private QueryDNS queryDNS;
+    private TestRunner queryDNSTestRunner;
+
+    @Before
+    public void setupTest() throws Exception {
+        this.queryDNS =  new QueryDNS();
+        this.queryDNSTestRunner = TestRunners.newTestRunner(queryDNS);
+
+        Hashtable env = new Hashtable<String, String>();
+        env.put(Context.INITIAL_CONTEXT_FACTORY, 
FakeDNSInitialDirContextFactory.class.getName());
+
+        this.queryDNS.initializeContext(env);
+
+        final DirContext mockContext = 
FakeDNSInitialDirContextFactory.getLatestMockContext();
+
+        // Capture JNDI's getAttibutes method containing the (String) 
queryValue and (String[]) queryType
+        Mockito.when( mockContext.getAttributes(Mockito.anyString(), 
Mockito.any(String[].class)))
+                .thenAnswer(new Answer() {
+                    public Object answer(InvocationOnMock invocation) throws 
Throwable {
+                        // Craft a false DNS response
+                        // Note the DNS response will not make use of any of 
the mocked
+                        // query contents (all input is discarded and replies 
synthetically
+                        // generated
+                        return craftResponse(invocation);
+                    }
+                });
+    }
+
+    @Test
+    public void testVanillaQueryWithoutSplit()  {
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "PTR");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, 
"${ip_address:getDelimitedField(4, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(1, '.'):trim()}" +
+                ".in-addr.arpa");
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, 
QueryDNS.NONE.getValue());
+
+        final Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("ip_address", "123.123.123.123");
+
+        queryDNSTestRunner.enqueue(new byte[0], attributeMap);
+        queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes());
+
+        queryDNSTestRunner.run(1,true, false);
+
+        List<MockFlowFile> results = 
queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND);
+        assertTrue(results.size() == 1);
+        String result = 
results.get(0).getAttribute("enrich.dns.record0.group0");
+
+        assertTrue(result.contains("apache.nifi.org"));
+
+
+    }
+
+    @Test
+    public void testValidDataWithSplit()  {
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "TXT");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, 
"${ip_address:getDelimitedField(4, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(1, '.'):trim()}" +
+                ".origin.asn.nifi.apache.org");
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, 
QueryDNS.SPLIT.getValue());
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|");
+
+        final Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("ip_address", "123.123.123.123");
+
+        queryDNSTestRunner.enqueue(new byte[0], attributeMap);
+        queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes());
+        queryDNSTestRunner.run(1,true, false);
+
+        List<MockFlowFile> results = 
queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND);
+        assertTrue(results.size() == 1);
+
+        results.get(0).assertAttributeEquals("enrich.dns.record0.group5", " 
Apache NiFi");
+    }
+
+    @Test
+    public void testValidDataWithRegex()  {
+
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "TXT");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, 
"${ip_address:getDelimitedField(4, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(3, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(2, '.'):trim()}" +
+                ".${ip_address:getDelimitedField(1, '.'):trim()}" +
+                ".origin.asn.nifi.apache.org");
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, 
QueryDNS.REGEX.getValue());
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, 
"\\.*(\\sApache\\sNiFi)$");
+
+        final Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("ip_address", "123.123.123.123");
+
+        queryDNSTestRunner.enqueue(new byte[0], attributeMap);
+        queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes());
+        queryDNSTestRunner.run(1, true, false);
+
+        List<MockFlowFile> results = 
queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_FOUND);
+        assertTrue(results.size() == 1);
+
+        results.get(0).assertAttributeEquals("enrich.dns.record0.group0", " 
Apache NiFi");
+
+    }
+
+    @Test
+    public void testInvalidData()  {
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, 
"nifi.apache.org");
+
+
+        final Map<String, String> attributeMap = new HashMap<>();
+        attributeMap.put("ip_address", "123.123.123.123");
+
+        queryDNSTestRunner.enqueue(new byte[0], attributeMap);
+        queryDNSTestRunner.enqueue("teste teste teste chocolate".getBytes());
+        queryDNSTestRunner.run(1, true, false);
+
+        List<MockFlowFile> results = 
queryDNSTestRunner.getFlowFilesForRelationship(QueryDNS.REL_NOT_FOUND);
+        assertTrue(results.size() == 1);
+    }
+
+    @Test
+    public void testCustomValidator() {
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_QUERY_TYPE, "AAAA");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_RETRIES, "1");
+        queryDNSTestRunner.setProperty(QueryDNS.DNS_TIMEOUT, "1000");
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_INPUT, 
"nifi.apache.org");
+        // Note the absence of a QUERY_PARSER_INPUT value
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, 
QueryDNS.REGEX.getValue());
+        queryDNSTestRunner.assertNotValid();
+
+        // Note the presence of a QUERY_PARSER_INPUT value
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, 
QueryDNS.REGEX.getValue());
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|");
+        queryDNSTestRunner.assertValid();
+
+        // Note the presence of a QUERY_PARSER_INPUT value while NONE is set
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER, 
QueryDNS.NONE.getValue());
+        queryDNSTestRunner.setProperty(QueryDNS.QUERY_PARSER_INPUT, "\\|");
+        queryDNSTestRunner.assertNotValid();
+    }
+
+
+
+
+    // Dummy pseudo-DNS responder
+    private Attributes craftResponse(InvocationOnMock invocation) {
+        Object[] arguments = invocation.getArguments();
+        String[] queryType = (String[]) arguments[1];
+
+        // Create attribute
+        Attributes attrs = new BasicAttributes(true);
+        BasicAttribute attr;
+
+        switch (queryType[0]) {
+            case "AAAA":
+                attr = new BasicAttribute("AAAA");
+                attrs.put(attr);
+                break;
+            case "TXT":
+                attr =  new BasicAttribute("TXT", "666 | 123.123.123.123/32 | 
Apache-NIFI | AU | nifi.org | Apache NiFi");
+                attrs.put(attr);
+                break;
+            case "PTR":
+                attr = new BasicAttribute("PTR");
+                attr.add(0, "eg-apache.nifi.org.");
+                attr.add(1, "apache.nifi.org.");
+                attrs.put(attr);
+                break;
+        }
+        return attrs;
+    }
+}
+

Reply via email to