[
https://issues.apache.org/jira/browse/NIFI-1971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15436689#comment-15436689
]
ASF GitHub Bot commented on NIFI-1971:
--------------------------------------
Github user trixpan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/858#discussion_r76225290
--- Diff:
nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/src/main/java/org/apache/nifi/processors/enrich/QueryWhois.java
---
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.enrich;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.net.whois.WhoisClient;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"whois", "enrich", "ip"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("A powerful whois query processor primary designed
to enrich DataFlows with whois based APIs " +
+ "(e.g. ShadowServer's ASN lookup) but that can be also used to
perform regular whois lookups.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "enrich.dns.record*.group*",
description = "The captured fields of the Whois query response for each of the
records received"),
+})
+public class QueryWhois extends AbstractEnrichProcessor {
+
+ public static final AllowableValue BEGIN_END = new
AllowableValue("Begin/End", "Begin/End",
+ "The evaluated input of each flowfile is enclosed within begin
and end tags. Each row contains a delimited set of fields");
+
+ public static final AllowableValue BULK_NONE = new
AllowableValue("None", "None",
+ "Queries are made without any particular dialect");
+
+
+ public static final PropertyDescriptor WHOIS_QUERY_TYPE = new
PropertyDescriptor.Builder()
+ .name("WHOIS_QUERY_TYPE")
+ .displayName("Whois Query Type")
+ .description("The Whois query type to be used by the processor
(if used)")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor WHOIS_SERVER = new
PropertyDescriptor.Builder()
+ .name("WHOIS_SERVER")
+ .displayName("Whois Server")
+ .description("The Whois server to be used")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor WHOIS_SERVER_PORT = new
PropertyDescriptor.Builder()
+ .name("WHOIS_SERVER_PORT")
+ .displayName("Whois Server Port")
+ .description("The TCP port of the remote Whois server")
+ .required(true)
+ .defaultValue("43")
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor WHOIS_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("WHOIS_TIMEOUT")
+ .displayName("Whois Query Timeout")
+ .description("The amount of time to wait until considering a
query as failed")
+ .required(true)
+ .defaultValue("1500 ms")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("BATCH_SIZE")
+ .displayName("Batch Size")
+ .description("The number of incoming FlowFiles to process in a
single execution of this processor. ")
+ .required(true)
+ .defaultValue("25")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BULK_PROTOCOL = new
PropertyDescriptor.Builder()
+ .name("BULK_PROTOCOL")
+ .displayName("Bulk Protocol")
+ .description("The protocol used to perform the bulk query. ")
+ .required(true)
+ .defaultValue(BULK_NONE.getValue())
+ .allowableValues(BEGIN_END, BULK_NONE)
+ .build();
+
+ @Override
+ public List<ValidationResult> customValidate(ValidationContext
validationContext) {
+ final List<ValidationResult> results = new
ArrayList<>(super.customValidate(validationContext));
+
+ final String chosenQUERY_PARSER =
validationContext.getProperty(QUERY_PARSER).getValue();
+
+ if (!chosenQUERY_PARSER.equals(NONE.getValue()) &&
!validationContext.getProperty(QUERY_PARSER_INPUT).isSet() ) {
+ results.add(new
ValidationResult.Builder().input("QUERY_PARSER_INPUT")
+ .explanation("Split and Regex parsers require a valid
Regular Expression")
+ .valid(false)
+ .build());
+ }
+
+
+ if (validationContext.getProperty(BATCH_SIZE).asInteger() > 1 &&
!validationContext.getProperty(KEY_GROUP).isSet() ) {
+ results.add(new ValidationResult.Builder().input("BATCH_SIZE")
+ .explanation("When operating in Batching mode, RegEx
and Split parsers require a Key lookup group")
+ .valid(false)
+ .build());
+ }
+
+ if ( validationContext.getProperty(BATCH_SIZE).asInteger() > 1 &&
chosenQUERY_PARSER.equals(NONE.getValue()) ) {
+ results.add(new
ValidationResult.Builder().input("QUERY_PARSER")
+ .explanation("NONE parser does not support batching.
Configure Batch Size to 1 or use another parser.")
+ .valid(false)
+ .build());
+ }
+
+ if ( validationContext.getProperty(BATCH_SIZE).asInteger() == 1
&&
!validationContext.getProperty(BULK_PROTOCOL).getValue().equals(BULK_NONE.getValue())
) {
+ results.add(new
ValidationResult.Builder().input("BULK_PROTOCOL")
+ .explanation("Bulk protocol requirement requires
batching. Configure Batch Size to more than 1 or use another protocol.")
+ .valid(false)
+ .build());
+ }
+
+
+
+ return results;
+ }
+
+ private final static List<PropertyDescriptor> propertyDescriptors;
+ private final static Set<Relationship> relationships;
+
+ private WhoisClient whoisClient;
+
+ static {
+ List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(QUERY_INPUT);
+ props.add(WHOIS_QUERY_TYPE);
+ props.add(WHOIS_SERVER);
+ props.add(WHOIS_SERVER_PORT);
+ props.add(WHOIS_TIMEOUT);
+ props.add(BATCH_SIZE);
+ props.add(BULK_PROTOCOL);
+ props.add(QUERY_PARSER);
+ props.add(QUERY_PARSER_INPUT);
+ props.add(KEY_GROUP);
+ propertyDescriptors = Collections.unmodifiableList(props);
+
+ Set<Relationship> rels = new HashSet<>();
+ rels.add(REL_FOUND);
+ rels.add(REL_NOT_FOUND);
+ relationships = Collections.unmodifiableSet(rels);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+ List<FlowFile> flowFiles = session.get(batchSize);
+
+ if (flowFiles == null || flowFiles.isEmpty()) {
+ //
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+ context.yield();
+ return;
+ }
+
+ // Build query
+ String buildString = "";
+ final String queryType =
context.getProperty(WHOIS_QUERY_TYPE).getValue();
+
+ // Verify the the protocol mode and craft the "begin"
pseudo-command, otherwise just the query type
+ buildString =
context.getProperty(BULK_PROTOCOL).getValue().equals("Begin/End") ?
buildString.concat("begin") : buildString.concat("");
--- End diff --
addressed
> Create a batch capable pseudo-whois ("netcat") enrichment Processor
> -------------------------------------------------------------------
>
> Key: NIFI-1971
> URL: https://issues.apache.org/jira/browse/NIFI-1971
> Project: Apache NiFi
> Issue Type: Improvement
> Affects Versions: 1.0.0, 0.7.0
> Reporter: Andre
> Assignee: Andre
> Fix For: 1.1.0
>
>
> While the QueryDNS created can be used on low to medium volume enrichment and
> to licensed DNS based lookups (e.g. commercial use of SpamHaus) many
> enrichment providers prefer the use of bulk queries using pseudo whois API
> (a.k.a. netcat interface).
> as documented
> [here|https://www.shadowserver.org/wiki/pmwiki.php/Services/IP-BGP#toc6] the
> bulk interfaces work by connecting to port 43/TCP and sending a payload like:
> {code}
> begin origin
> 4.5.4.3
> 17.112.152.32
> 208.77.188.166
> end
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)