http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java index 0000000,a755b1a..3ac55d2 mode 000000,100644..100644 --- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java +++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java @@@ -1,0 -1,498 +1,498 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashSet; + import java.util.LinkedHashMap; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.TreeSet; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.AtomicReference; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.PropertyValue; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.loading.LoadDistributionListener; + import org.apache.nifi.loading.LoadDistributionService; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.annotation.CapabilityDescription; + import org.apache.nifi.processor.annotation.EventDriven; + import org.apache.nifi.processor.annotation.OnScheduled; + import org.apache.nifi.processor.annotation.SideEffectFree; + import org.apache.nifi.processor.annotation.SupportsBatching; + import org.apache.nifi.processor.annotation.Tags; + import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable; + import org.apache.nifi.processor.util.StandardValidators; + + import org.apache.commons.lang3.StringUtils; + + @EventDriven + @SideEffectFree + @SupportsBatching + @TriggerWhenAnyDestinationAvailable + @Tags({"distribute", "load balance", "route", "round robin", "weighted"}) + @CapabilityDescription("Distributes FlowFiles to downstream processors based on a Distribution Strategy. If using the Round Robin " + + "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties" + + "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name " + + "'5' will be receive 10 FlowFiles in each iteration instead of 1.") + public class DistributeLoad extends AbstractProcessor { + + public static final String STRATEGY_ROUND_ROBIN = "round robin"; + public static final String STRATEGY_NEXT_AVAILABLE = "next available"; + public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service"; + + public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder() + .name("Number of Relationships") + .description("Determines the number of Relationships to which the load should be distributed") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder() + .name("Distribution Strategy") + .description( + "Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 destination can accept FlowFiles.") + .required(true) + .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE) + .defaultValue(STRATEGY_ROUND_ROBIN) + .build(); + + public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder() + .name("Hostnames") + .description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter") + .required(true) + .addValidator(new Validator() { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult result = new ValidationResult.Builder() + .subject(subject) + .valid(true) + .input(input) + .explanation("Good FQDNs") + .build(); + if (null == input) { + result = new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Need to specify delimited list of FQDNs") + .build(); + return result; + } + String[] hostNames = input.split("(?:,+|;+|\\s+)"); + for (String hostName : hostNames) { + if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) { + result = new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Need a FQDN rather than a simple host name.") + .build(); + return result; + } + } + return result; + } + }) + .build(); + public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder() + .name("Load Distribution Service ID") + .description("The identifier of the Load Distribution Service") + .required(true) + .identifiesControllerService(LoadDistributionService.class) + .build(); + + private List<PropertyDescriptor> properties; + private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>(); + private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<DistributionStrategy>(new RoundRobinStrategy()); + private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>(); + private final AtomicBoolean doCustomValidate = new AtomicBoolean(false); + private volatile LoadDistributionListener myListener; + private final AtomicBoolean doSetProps = new AtomicBoolean(true); + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(createRelationship(1)); + relationshipsRef.set(Collections.unmodifiableSet(relationships)); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(NUM_RELATIONSHIPS); + properties.add(DISTRIBUTION_STRATEGY); + this.properties = Collections.unmodifiableList(properties); + } + + private static Relationship createRelationship(final int num) { + return new Relationship.Builder().name(String.valueOf(num)).build(); + } + + @Override + public Set<Relationship> getRelationships() { + return relationshipsRef.get(); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (descriptor.equals(NUM_RELATIONSHIPS)) { + final Set<Relationship> relationships = new HashSet<>(); + for (int i = 1; i <= Integer.parseInt(newValue); i++) { + relationships.add(createRelationship(i)); + } + this.relationshipsRef.set(Collections.unmodifiableSet(relationships)); + } else if (descriptor.equals(DISTRIBUTION_STRATEGY)) { + switch (newValue.toLowerCase()) { + case STRATEGY_ROUND_ROBIN: + strategyRef.set(new RoundRobinStrategy()); + break; + case STRATEGY_NEXT_AVAILABLE: + strategyRef.set(new NextAvailableStrategy()); + break; + case STRATEGY_LOAD_DISTRIBUTION_SERVICE: + strategyRef.set(new LoadDistributionStrategy()); + } + doSetProps.set(true); + doCustomValidate.set(true); + } + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) { + final List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE); + props.add(HOSTNAMES); + this.properties = Collections.unmodifiableList(props); + } else if (doSetProps.getAndSet(false)) { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(NUM_RELATIONSHIPS); + props.add(DISTRIBUTION_STRATEGY); + this.properties = Collections.unmodifiableList(props); + } + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + // validate that the property name is valid. + final int numRelationships = this.relationshipsRef.get().size(); + try { + final int value = Integer.parseInt(propertyDescriptorName); + if (value <= 0 || value > numRelationships) { + return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)) + .name(propertyDescriptorName).build(); + } + } catch (final NumberFormatException e) { + return new PropertyDescriptor.Builder().addValidator(new InvalidPropertyNameValidator(propertyDescriptorName)) + .name(propertyDescriptorName).build(); + } + + // validate that the property value is valid + return new PropertyDescriptor.Builder().addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .name(propertyDescriptorName).dynamic(true).build(); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Collection<ValidationResult> results = new ArrayList<>(); + if (doCustomValidate.getAndSet(false)) { + String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue(); + if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) { + // make sure Hostnames and Controller service are set + PropertyValue propDesc = validationContext.getProperty(HOSTNAMES); + if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { + results.add(new ValidationResult.Builder() + .subject(HOSTNAMES.getName()) + .explanation("Must specify Hostnames when using 'Load Distribution Strategy'") + .valid(false) + .build()); + } + propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE); + if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) { + results.add(new ValidationResult.Builder() + .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName()) + .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'") + .valid(false) + .build()); + } + if (results.isEmpty()) { + int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger(); + String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue(); + String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)"); + int numHosts = 0; + for (String hostName : hostNames) { + if (StringUtils.isNotBlank(hostName)) { + hostNames[numHosts++] = hostName; + } + } + if (numHosts > numRels) { + results.add(new ValidationResult.Builder() + .subject("Number of Relationships and Hostnames") + .explanation("Number of Relationships must be equal to, or greater than, the number of host names") + .valid(false) + .build()); + } else { + // create new relationships with descriptions of hostname + Set<Relationship> relsWithDesc = new TreeSet<>(); + for (int i = 0; i < numHosts; i++) { + relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1)).description(hostNames[i]).build()); + } + // add add'l rels if configuration requires it...it probably shouldn't + for (int i = numHosts + 1; i <= numRels; i++) { + relsWithDesc.add(createRelationship(i)); + } + relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc)); + } + } + } + } + return results; + } + + @OnScheduled + public void createWeightedList(final ProcessContext context) { + final Map<Integer, Integer> weightings = new LinkedHashMap<>(); + + String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue(); + if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) { + String hostNamesValue = context.getProperty(HOSTNAMES).getValue(); + String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)"); + Set<String> hostNameSet = new HashSet<>(); + for (String hostName : hostNames) { + if (StringUtils.isNotBlank(hostName)) { + hostNameSet.add(hostName); + } + } + LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class); + myListener = new LoadDistributionListener() { + + @Override + public void update(Map<String, Integer> loadInfo) { + for (Relationship rel : relationshipsRef.get()) { + String hostname = rel.getDescription(); + Integer weight = 1; + if (loadInfo.containsKey(hostname)) { + weight = loadInfo.get(hostname); + } + weightings.put(Integer.decode(rel.getName()), weight); + } + updateWeightedRelationships(weightings); + } + }; + + Map<String, Integer> loadInfo = svc.getLoadDistribution(hostNameSet, myListener); + for (Relationship rel : relationshipsRef.get()) { + String hostname = rel.getDescription(); + Integer weight = 1; + if (loadInfo.containsKey(hostname)) { + weight = loadInfo.get(hostname); + } + weightings.put(Integer.decode(rel.getName()), weight); + } + + } else { + final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger(); + for (int i = 1; i <= numRelationships; i++) { + weightings.put(i, 1); + } + for (final PropertyDescriptor propDesc : context.getProperties().keySet()) { + if (!this.properties.contains(propDesc)) { + final int relationship = Integer.parseInt(propDesc.getName()); + final int weighting = context.getProperty(propDesc).asInteger(); + weightings.put(relationship, weighting); + } + } + } + updateWeightedRelationships(weightings); + } + + private void updateWeightedRelationships(final Map<Integer, Integer> weightings) { + final List<Relationship> relationshipList = new ArrayList<>(); + for (final Map.Entry<Integer, Integer> entry : weightings.entrySet()) { + final String relationshipName = String.valueOf(entry.getKey()); + final Relationship relationship = new Relationship.Builder().name(relationshipName).build(); + for (int i = 0; i < entry.getValue(); i++) { + relationshipList.add(relationship); + } + } + + this.weightedRelationshipListRef.set(Collections.unmodifiableList(relationshipList)); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final DistributionStrategy strategy = strategyRef.get(); - final Set<Relationship> available = session.getAvailableRelationships(); ++ final Set<Relationship> available = context.getAvailableRelationships(); + final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger(); + final boolean allDestinationsAvailable = (available.size() == numRelationships); + if (!allDestinationsAvailable && strategy.requiresAllDestinationsAvailable()) { + return; + } + - final Relationship relationship = strategy.mapToRelationship(session, flowFile); ++ final Relationship relationship = strategy.mapToRelationship(context, flowFile); + if (relationship == null) { + // can't transfer the FlowFiles. Roll back and yield + session.rollback(); + context.yield(); + return; + } + + session.transfer(flowFile, relationship); + session.getProvenanceReporter().route(flowFile, relationship); + } + + private static class InvalidPropertyNameValidator implements Validator { + + private final String propertyName; + + public InvalidPropertyNameValidator(final String propertyName) { + this.propertyName = propertyName; + } + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext validationContext) { + return new ValidationResult.Builder().subject("Property Name").input(propertyName) + .explanation("Property Name must be a positive integer between 1 and the number of relationships (inclusive)") + .valid(false) + .build(); + } + } + + /** + * Implementations must be thread-safe. + */ + private static interface DistributionStrategy { + + /** + * Returns a mapping of FlowFile to Relationship or <code>null</code> if + * the needed relationships are not available to accept files. + * + * @param session + * @param flowFiles + * @return + */ - Relationship mapToRelationship(ProcessSession session, FlowFile flowFile); ++ Relationship mapToRelationship(ProcessContext context, FlowFile flowFile); + + boolean requiresAllDestinationsAvailable(); + } + + private class LoadDistributionStrategy implements DistributionStrategy { + + private final AtomicLong counter = new AtomicLong(0L); + + @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { ++ public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { + final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); + final int numRelationships = relationshipList.size(); + + // create a HashSet that contains all of the available relationships, as calling #contains on HashSet + // is much faster than calling it on a List + boolean foundFreeRelationship = false; + Relationship relationship = null; + + int attempts = 0; + while (!foundFreeRelationship) { + final long counterValue = counter.getAndIncrement(); + final int idx = (int) (counterValue % numRelationships); + relationship = relationshipList.get(idx); - foundFreeRelationship = session.getAvailableRelationships().contains(relationship); ++ foundFreeRelationship = context.getAvailableRelationships().contains(relationship); + if (++attempts % numRelationships == 0 && !foundFreeRelationship) { + return null; + } + } + + return relationship; + } + + @Override + public boolean requiresAllDestinationsAvailable() { + return false; + } + + } + + private class RoundRobinStrategy implements DistributionStrategy { + + private final AtomicLong counter = new AtomicLong(0L); + + @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { ++ public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { + final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); + final long counterValue = counter.getAndIncrement(); + final int idx = (int) (counterValue % relationshipList.size()); + final Relationship relationship = relationshipList.get(idx); + return relationship; + } + + @Override + public boolean requiresAllDestinationsAvailable() { + return true; + } + } + + private class NextAvailableStrategy implements DistributionStrategy { + + private final AtomicLong counter = new AtomicLong(0L); + + @Override - public Relationship mapToRelationship(final ProcessSession session, final FlowFile flowFile) { ++ public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) { + final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get(); + final int numRelationships = relationshipList.size(); + + // create a HashSet that contains all of the available relationships, as calling #contains on HashSet + // is much faster than calling it on a List + boolean foundFreeRelationship = false; + Relationship relationship = null; + + int attempts = 0; + while (!foundFreeRelationship) { + final long counterValue = counter.getAndIncrement(); + final int idx = (int) (counterValue % numRelationships); + relationship = relationshipList.get(idx); - foundFreeRelationship = session.getAvailableRelationships().contains(relationship); ++ foundFreeRelationship = context.getAvailableRelationships().contains(relationship); + if (++attempts % numRelationships == 0 && !foundFreeRelationship) { + return null; + } + } + + return relationship; + } + + @Override + public boolean requiresAllDestinationsAvailable() { + return false; + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 0000000,2b0b437..b7fe97a mode 000000,100644..100644 --- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@@ -1,0 -1,321 +1,323 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicReference; + import java.util.regex.Pattern; + + import javax.servlet.Servlet; + import javax.ws.rs.Path; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; + import org.apache.nifi.stream.io.StreamThrottler; + import org.apache.nifi.processor.AbstractSessionFactoryProcessor; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.annotation.CapabilityDescription; + import org.apache.nifi.processor.annotation.OnScheduled; + import org.apache.nifi.processor.annotation.OnStopped; + import org.apache.nifi.processor.annotation.Tags; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet; + import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet; + import org.apache.nifi.ssl.SSLContextService; + + import org.eclipse.jetty.server.Connector; + import org.eclipse.jetty.server.HttpConfiguration; + import org.eclipse.jetty.server.HttpConnectionFactory; + import org.eclipse.jetty.server.SecureRequestCustomizer; + import org.eclipse.jetty.server.Server; + import org.eclipse.jetty.server.ServerConnector; + import org.eclipse.jetty.server.SslConnectionFactory; + import org.eclipse.jetty.servlet.ServletContextHandler; + import org.eclipse.jetty.util.ssl.SslContextFactory; + import org.eclipse.jetty.util.thread.QueuedThreadPool; + + @Tags({"ingest", "http", "https", "rest", "listen"}) + @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener") + public class ListenHTTP extends AbstractSessionFactoryProcessor { + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder() + .name("success") + .description("Relationship for successfully received FlowFiles") + .build(); + + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Listening Port") + .description("The Port to listen on for incoming connections") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder() + .name("Authorized DN Pattern") + .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.") + .required(true) + .defaultValue(".*") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_UNCONFIRMED_TIME = new PropertyDescriptor.Builder() + .name("Max Unconfirmed Flowfile Time") + .description("The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache") + .required(true) + .defaultValue("60 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_DATA_RATE = new PropertyDescriptor.Builder() + .name("Max Data to Receive per Second") + .description("The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + public static final PropertyDescriptor HEADERS_AS_ATTRIBUTES_REGEX = new PropertyDescriptor.Builder() + .name("HTTP Headers to receive as Attributes (Regex)") + .description("Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .required(false) + .build(); + + public static final String URI = "/contentListener"; + public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; + public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; + public static final String CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER = "sessionFactoryHolder"; ++ public static final String CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER = "processContextHolder"; + public static final String CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN = "authorityPattern"; + public static final String CONTEXT_ATTRIBUTE_HEADER_PATTERN = "headerPattern"; + public static final String CONTEXT_ATTRIBUTE_FLOWFILE_MAP = "flowFileMap"; + public static final String CONTEXT_ATTRIBUTE_STREAM_THROTTLER = "streamThrottler"; + + private volatile Server server = null; + private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<String, FlowFileEntryTimeWrapper>(); + private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(RELATIONSHIP_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(PORT); + descriptors.add(MAX_DATA_RATE); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(AUTHORIZED_DN_PATTERN); + descriptors.add(MAX_UNCONFIRMED_TIME); + descriptors.add(HEADERS_AS_ATTRIBUTES_REGEX); + this.properties = Collections.unmodifiableList(descriptors); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnStopped + public void shutdownHttpServer() { + final Server toShutdown = this.server; + if (toShutdown == null) { + return; + } + + try { + toShutdown.stop(); + toShutdown.destroy(); + } catch (final Exception ex) { + getLogger().warn("unable to cleanly shutdown embedded server due to {}", new Object[]{ex}); + this.server = null; + } + } + + private void createHttpServerFromService(final ProcessContext context) throws Exception { + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final Double maxBytesPerSecond = context.getProperty(MAX_DATA_RATE).asDataSize(DataUnit.B); + final StreamThrottler streamThrottler = (maxBytesPerSecond == null) ? null : new LeakyBucketStreamThrottler(maxBytesPerSecond.intValue()); + + final boolean needClientAuth = sslContextService == null ? false : sslContextService.getTrustStoreFile() != null; + + final SslContextFactory contextFactory = new SslContextFactory(); + contextFactory.setNeedClientAuth(needClientAuth); + + if (needClientAuth) { + contextFactory.setTrustStorePath(sslContextService.getTrustStoreFile()); + contextFactory.setTrustStoreType(sslContextService.getTrustStoreType()); + contextFactory.setTrustStorePassword(sslContextService.getTrustStorePassword()); + } + + final String keystorePath = sslContextService == null ? null : sslContextService.getKeyStoreFile(); + if (keystorePath != null) { + final String keystorePassword = sslContextService.getKeyStorePassword(); + final String keyStoreType = sslContextService.getKeyStoreType(); + + contextFactory.setKeyStorePath(keystorePath); + contextFactory.setKeyManagerPassword(keystorePassword); + contextFactory.setKeyStorePassword(keystorePassword); + contextFactory.setKeyStoreType(keyStoreType); + } + + // thread pool for the jetty instance + final QueuedThreadPool threadPool = new QueuedThreadPool(); + threadPool.setName(String.format("%s (%s) Web Server", getClass().getSimpleName(), getIdentifier())); + + // create the server instance + final Server server = new Server(threadPool); + + // get the configured port + final int port = context.getProperty(PORT).asInteger(); + + final ServerConnector connector; + final HttpConfiguration httpConfiguration = new HttpConfiguration(); + if (keystorePath == null) { + // create the connector + connector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration)); + } else { + // configure the ssl connector + httpConfiguration.setSecureScheme("https"); + httpConfiguration.setSecurePort(port); + httpConfiguration.addCustomizer(new SecureRequestCustomizer()); + + // build the connector + connector = new ServerConnector(server, + new SslConnectionFactory(contextFactory, "http/1.1"), + new HttpConnectionFactory(httpConfiguration)); + } + + // configure the port + connector.setPort(port); + + // add the connector to the server + server.setConnectors(new Connector[]{connector}); + + final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null)); + for (final Class<? extends Servlet> cls : getServerClasses()) { + final Path path = cls.getAnnotation(Path.class); + if (path == null) { + contextHandler.addServlet(cls, "/*"); + } else { + contextHandler.addServlet(cls, path.value()); + } + } + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESSOR, this); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_LOGGER, getLogger()); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_SESSION_FACTORY_HOLDER, sessionFactoryReference); ++ contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PROCESS_CONTEXT_HOLDER, context); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_FLOWFILE_MAP, flowFileMap); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN, Pattern.compile(context.getProperty(AUTHORIZED_DN_PATTERN).getValue())); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_STREAM_THROTTLER, streamThrottler); + + if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); + } + server.start(); + + this.server = server; + } + + @OnScheduled + public void createHttpServer(final ProcessContext context) throws Exception { + createHttpServerFromService(context); + } + + protected Set<Class<? extends Servlet>> getServerClasses() { + final Set<Class<? extends Servlet>> s = new HashSet<>(); + s.add(ListenHTTPServlet.class); + s.add(ContentAcknowledgmentServlet.class); + return s; + } + + private Set<String> findOldFlowFileIds(final ProcessContext ctx) { + final Set<String> old = new HashSet<>(); + + final long expiryMillis = ctx.getProperty(MAX_UNCONFIRMED_TIME).asTimePeriod(TimeUnit.MILLISECONDS); + final long cutoffTime = System.currentTimeMillis() - expiryMillis; + for (final Map.Entry<String, FlowFileEntryTimeWrapper> entry : flowFileMap.entrySet()) { + final FlowFileEntryTimeWrapper wrapper = entry.getValue(); + if (wrapper != null && wrapper.getEntryTime() < cutoffTime) { + old.add(entry.getKey()); + } + } + + return old; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { + sessionFactoryReference.compareAndSet(null, sessionFactory); + + for (final String id : findOldFlowFileIds(context)) { + final FlowFileEntryTimeWrapper wrapper = flowFileMap.remove(id); + if (wrapper != null) { + getLogger().warn("failed to received acknowledgment for HOLD with ID {}; rolling back session", new Object[]{id}); + wrapper.session.rollback(); + } + } + + context.yield(); + } + + public static class FlowFileEntryTimeWrapper { + + private final Set<FlowFile> flowFiles; + private final long entryTime; + private final ProcessSession session; + + public FlowFileEntryTimeWrapper(final ProcessSession session, final Set<FlowFile> flowFiles, final long entryTime) { + this.flowFiles = flowFiles; + this.entryTime = entryTime; + this.session = session; + } + + public Set<FlowFile> getFlowFiles() { + return flowFiles; + } + + public long getEntryTime() { + return entryTime; + } + + public ProcessSession getSession() { + return session; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java index 0000000,65b3c66..43d8395 mode 000000,100644..100644 --- a/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java +++ b/nifi/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java @@@ -1,0 -1,627 +1,627 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.IOException; + import java.net.InetAddress; + import java.net.NetworkInterface; + import java.net.SocketException; + import java.net.UnknownHostException; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.Enumeration; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.Callable; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantLock; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.components.Validator; + import org.apache.nifi.expression.AttributeExpression; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.io.nio.BufferPool; + import org.apache.nifi.io.nio.ChannelListener; + import org.apache.nifi.io.nio.consumer.StreamConsumer; + import org.apache.nifi.io.nio.consumer.StreamConsumerFactory; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractSessionFactoryProcessor; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.annotation.CapabilityDescription; + import org.apache.nifi.processor.annotation.OnScheduled; + import org.apache.nifi.processor.annotation.OnStopped; + import org.apache.nifi.processor.annotation.OnUnscheduled; + import org.apache.nifi.processor.annotation.Tags; + import org.apache.nifi.processor.annotation.TriggerWhenEmpty; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.processors.standard.util.UDPStreamConsumer; + import org.apache.nifi.util.Tuple; + + import org.apache.commons.lang3.StringUtils; + + /** + * <p> + * This processor listens for Datagram Packets on a given port and concatenates + * the contents of those packets together generating flow files roughly as often + * as the internal buffer fills up or until no more data is currently available. + * </p> + * + * <p> + * This processor has the following required properties: + * <ul> + * <li><b>Port</b> - The port to listen on for data packets. Must be known by + * senders of Datagrams.</li> + * <li><b>Receive Timeout</b> - The time out period when waiting to receive data + * from the socket. Specify units. Default is 5 secs.</li> + * <li><b>Max Buffer Size</b> - Determines the size each receive buffer may be. + * Specify units. Default is 1 MB.</li> + * <li><b>FlowFile Size Trigger</b> - Determines the (almost) upper bound size + * at which a flow file would be generated. A flow file will get made even if + * this value isn't reached if there is no more data streaming in and this value + * may be exceeded by the size of a single packet. Specify units. Default is 1 + * MB.</li> + * <li><b>Max size of UDP Buffer</b> - The maximum UDP buffer size that should + * be used. This is a suggestion to the Operating System to indicate how big the + * udp socket buffer should be. Specify units. Default is 1 MB.")</li> + * <li><b>Receive Buffer Count</b> - Number of receiving buffers to be used to + * accept data from the socket. Higher numbers means more ram is allocated but + * can allow better throughput. Default is 4.</li> + * <li><b>Channel Reader Interval</b> - Scheduling interval for each read + * channel. Specify units. Default is 50 millisecs.</li> + * <li><b>FlowFiles Per Session</b> - The number of flow files per session. + * Higher number is more efficient, but will lose more data if a problem occurs + * that causes a rollback of a session. Default is 10</li> + * </ul> + * </p> + * + * This processor has the following optional properties: + * <ul> + * <li><b>Sending Host</b> - IP, or name, of a remote host. Only Datagrams from + * the specified Sending Host Port and this host will be accepted. Improves + * Performance. May be a system property or an environment variable.</li> + * <li><b>Sending Host Port</b> - Port being used by remote host to send + * Datagrams. Only Datagrams from the specified Sending Host and this port will + * be accepted. Improves Performance. May be a system property or an environment + * variable.</li> + * </ul> + * </p> + * + * <p> + * The following relationships are required: + * <ul> + * <li><b>success</b> - Where to route newly created flow files.</li> + * </ul> + * </p> + * + */ + @TriggerWhenEmpty + @Tags({"ingest", "udp", "listen", "source"}) + @CapabilityDescription("Listens for Datagram Packets on a given port and concatenates the contents of those packets " + + "together generating flow files") + public class ListenUDP extends AbstractSessionFactoryProcessor { + + private static final Set<Relationship> relationships; + private static final List<PropertyDescriptor> properties; + + // relationships. + public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder() + .name("success") + .description("Connection which contains concatenated Datagram Packets") + .build(); + + static { + Set<Relationship> rels = new HashSet<>(); + rels.add(RELATIONSHIP_SUCCESS); + relationships = Collections.unmodifiableSet(rels); + } + // required properties. + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Port") + .description("Port to listen on. Must be known by senders of Datagrams.") + .addValidator(StandardValidators.PORT_VALIDATOR) + .required(true) + .build(); + + public static final PropertyDescriptor RECV_TIMEOUT = new PropertyDescriptor.Builder() + .name("Receive Timeout") + .description("The time out period when waiting to receive data from the socket. Specify units.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs") + .required(true) + .build(); + + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Buffer Size") + .description("Determines the size each receive buffer may be") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); + + public static final PropertyDescriptor FLOW_FILE_SIZE_TRIGGER = new PropertyDescriptor.Builder() + .name("FlowFile Size Trigger") + .description("Determines the (almost) upper bound size at which a flow file would be generated.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); + + public static final PropertyDescriptor MAX_UDP_BUFFER = new PropertyDescriptor.Builder() + .name("Max size of UDP Buffer") + .description("The maximum UDP buffer size that should be used. This is a suggestion to the Operating System " + + "to indicate how big the udp socket buffer should be.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); + + public static final PropertyDescriptor RECV_BUFFER_COUNT = new PropertyDescriptor.Builder() + .name("Receive Buffer Count") + .description("Number of receiving buffers to be used to accept data from the socket. Higher numbers " + + "means more ram is allocated but can allow better throughput.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("4") + .required(true) + .build(); + + public static final PropertyDescriptor CHANNEL_READER_PERIOD = new PropertyDescriptor.Builder() + .name("Channel Reader Interval") + .description("Scheduling interval for each read channel.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("50 ms") + .required(true) + .build(); + + public static final PropertyDescriptor FLOW_FILES_PER_SESSION = new PropertyDescriptor.Builder() + .name("FlowFiles Per Session") + .description("The number of flow files per session.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10") + .build(); + + // optional properties. + public static final PropertyDescriptor SENDING_HOST = new PropertyDescriptor.Builder() + .name("Sending Host") + .description("IP, or name, of a remote host. Only Datagrams from the specified Sending Host Port and this host will " + + "be accepted. Improves Performance. May be a system property or an environment variable.") + .addValidator(new HostValidator()) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor SENDING_HOST_PORT = new PropertyDescriptor.Builder() + .name("Sending Host Port") + .description("Port being used by remote host to send Datagrams. Only Datagrams from the specified Sending Host and " + + "this port will be accepted. Improves Performance. May be a system property or an environment variable.") + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + private static final Set<String> interfaceSet = new HashSet<>(); + + static { + try { + final Enumeration<NetworkInterface> interfaceEnum + = NetworkInterface.getNetworkInterfaces(); + while (interfaceEnum.hasMoreElements()) { + final NetworkInterface ifc = interfaceEnum.nextElement(); + interfaceSet.add(ifc.getName()); + } + } catch (SocketException e) { + } + } + public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder() + .name("Local Network Interface") + .description("The name of a local network interface to be used to restrict listening for UDP Datagrams to a specific LAN." + + "May be a system property or an environment variable.") + .addValidator(new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult result = new ValidationResult.Builder() + .subject("Local Network Interface") + .valid(true) + .input(input) + .build(); + if (interfaceSet.contains(input.toLowerCase())) { + return result; + } + + String message; + try { + AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input); + String realValue = ae.evaluate(); + if (interfaceSet.contains(realValue.toLowerCase())) { + return result; + } + + message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString(); + + } catch (IllegalArgumentException e) { + message = "Not a valid AttributeExpression: " + e.getMessage(); + } + result = new ValidationResult.Builder() + .subject("Local Network Interface") + .valid(false) + .input(input) + .explanation(message) + .build(); + + return result; + } + }) + .expressionLanguageSupported(true) + .build(); + + static { + List<PropertyDescriptor> props = new ArrayList<>(); + props.add(SENDING_HOST); + props.add(SENDING_HOST_PORT); + props.add(NETWORK_INTF_NAME); + props.add(CHANNEL_READER_PERIOD); + props.add(FLOW_FILE_SIZE_TRIGGER); + props.add(MAX_BUFFER_SIZE); + props.add(MAX_UDP_BUFFER); + props.add(PORT); + props.add(RECV_BUFFER_COUNT); + props.add(FLOW_FILES_PER_SESSION); + props.add(RECV_TIMEOUT); + properties = Collections.unmodifiableList(props); + } + // defaults + public static final int DEFAULT_LISTENING_THREADS = 2; + // lock used to protect channelListener + private final Lock lock = new ReentrantLock(); + private volatile ChannelListener channelListener = null; + private final BlockingQueue<Tuple<ProcessSession, List<FlowFile>>> flowFilesPerSessionQueue = new LinkedBlockingQueue<>(); + private final List<FlowFile> newFlowFiles = new ArrayList<>(); + private final AtomicReference<UDPStreamConsumer> consumerRef = new AtomicReference<>(); + private final AtomicBoolean stopping = new AtomicBoolean(false); + private final AtomicReference<ProcessSessionFactory> sessionFactoryRef = new AtomicReference<>(); + private final ExecutorService consumerExecutorService = Executors.newSingleThreadExecutor(); + private final AtomicReference<Future<Tuple<ProcessSession, List<FlowFile>>>> consumerFutureRef = new AtomicReference<>(); + private final AtomicBoolean resetChannelListener = new AtomicBoolean(false); + // instance attribute for provenance receive event generation + private volatile String sendingHost; + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + /** + * Create the ChannelListener and a thread that causes the Consumer to + * create flow files. + * + * @param context + * @throws IOException + */ + @OnScheduled + public void initializeChannelListenerAndConsumerProcessing(final ProcessContext context) throws IOException { + getChannelListener(context); + stopping.set(false); + Future<Tuple<ProcessSession, List<FlowFile>>> consumerFuture = consumerExecutorService + .submit(new Callable<Tuple<ProcessSession, List<FlowFile>>>() { + + @Override + public Tuple<ProcessSession, List<FlowFile>> call() { + final int maxFlowFilesPerSession = context.getProperty(FLOW_FILES_PER_SESSION).asInteger(); + final long channelReaderIntervalMSecs = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS); + // number of waits in 5 secs, or 1 + final int maxWaits = (int) (channelReaderIntervalMSecs <= 1000 ? 5000 / channelReaderIntervalMSecs : 1); + final ProcessorLog logger = getLogger(); + int flowFileCount = maxFlowFilesPerSession; + ProcessSession session = null; + int numWaits = 0; + while (!stopping.get()) { + UDPStreamConsumer consumer = consumerRef.get(); + if (consumer == null || sessionFactoryRef.get() == null) { + try { + Thread.sleep(100L); + } catch (InterruptedException swallow) { + } + } else { + try { + // first time through, flowFileCount is maxFlowFilesPerSession so that a session + // is created and the consumer is updated with it. + if (flowFileCount == maxFlowFilesPerSession || numWaits == maxWaits) { + logger.debug("Have waited {} times", new Object[]{numWaits}); + numWaits = 0; + if (session != null) { + Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>( + session, + new ArrayList<>(newFlowFiles)); + newFlowFiles.clear(); + flowFilesPerSessionQueue.add(flowFilesPerSession); + } + session = sessionFactoryRef.get().createSession(); + consumer.setSession(session); + flowFileCount = 0; + } + // this will throttle the processing of the received datagrams. If there are no more + // buffers to read into because none have been returned to the pool via consumer.process(), + // then the desired back pressure on the channel is created. - if (session.getAvailableRelationships().size() > 0) { ++ if (context.getAvailableRelationships().size() > 0) { + consumer.process(); + if (flowFileCount == newFlowFiles.size()) { + // no new datagrams received, need to throttle this thread back so it does + // not consume all cpu...but don't want to cause back pressure on the channel + // so the sleep time is same as the reader interval + // If have done this for approx. 5 secs, assume datagram sender is down. So, push + // out the remaining flow files (see numWaits == maxWaits above) + Thread.sleep(channelReaderIntervalMSecs); + if (flowFileCount > 0) { + numWaits++; + } + } else { + flowFileCount = newFlowFiles.size(); + } + } else { + logger.debug("Creating back pressure...no available destinations"); + Thread.sleep(1000L); + } + } catch (final IOException ioe) { + logger.error("Unable to fully process consumer {}", new Object[]{consumer}, ioe); + } catch (InterruptedException e) { + // don't care + } finally { + if (consumer.isConsumerFinished()) { + logger.info("Consumer {} was closed and is finished", new Object[]{consumer}); + consumerRef.set(null); + disconnect(); + if (!stopping.get()) { + resetChannelListener.set(true); + } + } + } + } + } + // when shutting down, need consumer to drain rest of cached buffers and clean up. + // prior to getting here, the channelListener was shutdown + UDPStreamConsumer consumer; + while ((consumer = consumerRef.get()) != null && !consumer.isConsumerFinished()) { + try { + consumer.process(); + } catch (IOException swallow) { + // if this is blown...consumer.isConsumerFinished will be true + } + } + Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = new Tuple<ProcessSession, List<FlowFile>>(session, + new ArrayList<>(newFlowFiles)); + return flowFilesPerSession; + } + }); + consumerFutureRef.set(consumerFuture); + } + + private void disconnect() { + if (lock.tryLock()) { + try { + if (channelListener != null) { + getLogger().debug("Shutting down channel listener {}", new Object[]{channelListener}); + channelListener.shutdown(500L, TimeUnit.MILLISECONDS); + channelListener = null; + } + } finally { + lock.unlock(); + } + } + } + + private void getChannelListener(final ProcessContext context) throws IOException { + if (lock.tryLock()) { + try { + ProcessorLog logger = getLogger(); + logger.debug("Instantiating a new channel listener"); + final int port = context.getProperty(PORT).asInteger(); + final int bufferCount = context.getProperty(RECV_BUFFER_COUNT).asInteger(); + final Double bufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B); + final Double rcvBufferSize = context.getProperty(MAX_UDP_BUFFER).asDataSize(DataUnit.B); + sendingHost = context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue(); + final Integer sendingHostPort = context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger(); + final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + final Double flowFileSizeTrigger = context.getProperty(FLOW_FILE_SIZE_TRIGGER).asDataSize(DataUnit.B); + final int recvTimeoutMS = context.getProperty(RECV_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final StreamConsumerFactory consumerFactory = new StreamConsumerFactory() { + + @Override + public StreamConsumer newInstance(final String streamId) { + final UDPStreamConsumer consumer = new UDPStreamConsumer(streamId, newFlowFiles, flowFileSizeTrigger.intValue(), getLogger()); + consumerRef.set(consumer); + return consumer; + } + }; + final int readerMilliseconds = context.getProperty(CHANNEL_READER_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final BufferPool bufferPool = new BufferPool(bufferCount, bufferSize.intValue(), false, Integer.MAX_VALUE); + channelListener = new ChannelListener(DEFAULT_LISTENING_THREADS, consumerFactory, bufferPool, recvTimeoutMS, TimeUnit.MILLISECONDS); + // specifying a sufficiently low number for each stream to be fast enough though very efficient + channelListener.setChannelReaderSchedulingPeriod(readerMilliseconds, TimeUnit.MILLISECONDS); + InetAddress nicIPAddress = null; + if (null != nicIPAddressStr) { + NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr); + nicIPAddress = netIF.getInetAddresses().nextElement(); + } + channelListener.addDatagramChannel(nicIPAddress, port, rcvBufferSize.intValue(), sendingHost, sendingHostPort); + logger.info("Registered service and initialized UDP socket listener. Now listening on port " + port + "..."); + } finally { + lock.unlock(); + } + } + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Collection<ValidationResult> result = new ArrayList<>(); + String sendingHost = validationContext.getProperty(SENDING_HOST).getValue(); + String sendingPort = validationContext.getProperty(SENDING_HOST_PORT).getValue(); + if (StringUtils.isBlank(sendingHost) && StringUtils.isNotBlank(sendingPort)) { + result.add(new ValidationResult.Builder() + .subject(SENDING_HOST.getName()) + .valid(false) + .explanation("Must specify Sending Host when specifying Sending Host Port") + .build()); + } else if (StringUtils.isBlank(sendingPort) && StringUtils.isNotBlank(sendingHost)) { + result.add(new ValidationResult.Builder() + .subject(SENDING_HOST_PORT.getName()) + .valid(false) + .explanation("Must specify Sending Host Port when specifying Sending Host") + .build()); + } + return result; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessorLog logger = getLogger(); + sessionFactoryRef.compareAndSet(null, sessionFactory); + if (resetChannelListener.getAndSet(false) && !stopping.get()) { + try { + getChannelListener(context); + } catch (IOException e) { + logger.error("Tried to reset Channel Listener and failed due to:", e); + resetChannelListener.set(true); + } + } + + transferFlowFiles(); + } + + private boolean transferFlowFiles() { + final ProcessorLog logger = getLogger(); + ProcessSession session; + Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession = null; + boolean transferred = false; + try { + flowFilesPerSession = flowFilesPerSessionQueue.poll(100L, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + if (flowFilesPerSession != null) { + session = flowFilesPerSession.getKey(); + List<FlowFile> flowFiles = flowFilesPerSession.getValue(); + String sourceSystem = sendingHost == null ? "Unknown" : sendingHost; + try { + for (FlowFile flowFile : flowFiles) { + session.getProvenanceReporter().receive(flowFile, sourceSystem); + session.transfer(flowFile, RELATIONSHIP_SUCCESS); + } + logger.info("Transferred flow files {} to success", new Object[]{flowFiles}); + transferred = true; + + // need to check for erroneous flow files in input queue + List<FlowFile> existingFlowFiles = session.get(10); + for (FlowFile existingFlowFile : existingFlowFiles) { + if (existingFlowFile != null && existingFlowFile.getSize() > 0) { + session.transfer(existingFlowFile, RELATIONSHIP_SUCCESS); + logger.warn("Found flow file in input queue (shouldn't have). Transferred flow file {} to success", + new Object[]{existingFlowFile}); + } else if (existingFlowFile != null) { + session.remove(existingFlowFile); + logger.warn("Found empty flow file in input queue (shouldn't have). Removed flow file {}", new Object[]{existingFlowFile}); + } + } + session.commit(); + } catch (Throwable t) { + session.rollback(); + logger.error("Failed to transfer flow files or commit session...rolled back", t); + throw t; + } + } + return transferred; + } + + @OnUnscheduled + public void stopping() { + getLogger().debug("Stopping Processor"); + disconnect(); + stopping.set(true); + Future<Tuple<ProcessSession, List<FlowFile>>> future; + Tuple<ProcessSession, List<FlowFile>> flowFilesPerSession; + if ((future = consumerFutureRef.getAndSet(null)) != null) { + try { + flowFilesPerSession = future.get(); + if (flowFilesPerSession.getValue().size() > 0) { + getLogger().debug("Draining remaining flow Files when stopping"); + flowFilesPerSessionQueue.add(flowFilesPerSession); + } else { + // need to close out the session that has no flow files + flowFilesPerSession.getKey().commit(); + } + } catch (InterruptedException | ExecutionException e) { + getLogger().error("Failure in cleaning up!", e); + } + boolean moreFiles = true; + while (moreFiles) { + try { + moreFiles = transferFlowFiles(); + } catch (Throwable t) { + getLogger().error("Problem transferring cached flowfiles", t); + } + } + } + } + + @OnStopped + public void stopped() { + sessionFactoryRef.set(null); + } + + public static class HostValidator implements Validator { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + try { + InetAddress.getByName(input); + return new ValidationResult.Builder() + .subject(subject) + .valid(true) + .input(input) + .build(); + } catch (final UnknownHostException e) { + return new ValidationResult.Builder() + .subject(subject) + .valid(false) + .input(input) + .explanation("Unknown host: " + e) + .build(); + } + } + + } + + }
