http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HadoopValidators.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HadoopValidators.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HadoopValidators.java new file mode 100644 index 0000000..8e60b91 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/HadoopValidators.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +import java.io.File; + +/** + * Validators for Hadoop related processors. + */ +public interface HadoopValidators { + + /* + * Validates that one or more files exist, as specified in a single property. + */ + Validator ONE_OR_MORE_FILE_EXISTS_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + final String[] files = input.split(","); + for (String filename : files) { + try { + final File file = new File(filename.trim()); + final boolean valid = file.exists() && file.isFile(); + if (!valid) { + final String message = "File " + file + " does not exist or is not a file"; + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build(); + } + } catch (SecurityException e) { + final String message = "Unable to access " + filename + " due to " + e.getMessage(); + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(message).build(); + } + } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + + }; + + /* + * Validates that a property is a valid umask, i.e. a short octal number that is not negative. + */ + Validator UMASK_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + final short shortVal = Short.parseShort(value, 8); + if (shortVal < 0) { + reason = "octal umask [" + value + "] cannot be negative"; + } else if (shortVal > 511) { + // HDFS umask has 9 bits: rwxrwxrwx ; the sticky bit cannot be umasked + reason = "octal umask [" + value + "] is not a valid umask"; + } + } catch (final NumberFormatException e) { + reason = "[" + value + "] is not a valid short octal number"; + } + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null) + .build(); + } + }; + + /* + * Validates that a property is a valid short number greater than 0. + */ + Validator POSITIVE_SHORT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + String reason = null; + try { + final short shortVal = Short.parseShort(value); + if (shortVal <= 0) { + reason = "short integer must be greater than zero"; + } + } catch (final NumberFormatException e) { + reason = "[" + value + "] is not a valid short integer"; + } + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null) + .build(); + } + }; + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java new file mode 100644 index 0000000..8cd1ea1 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/java/org/apache/nifi/hadoop/TestKerberosProperties.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.List; + +public class TestKerberosProperties { + + @Test + public void testWithKerberosConfigFile() { + final File file = new File("src/test/resources/krb5.conf"); + + final KerberosProperties kerberosProperties = new KerberosProperties(file); + Assert.assertNotNull(kerberosProperties); + + Assert.assertNotNull(kerberosProperties.getKerberosConfigFile()); + Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator()); + Assert.assertNotNull(kerberosProperties.getKerberosPrincipal()); + Assert.assertNotNull(kerberosProperties.getKerberosKeytab()); + + final ValidationResult result = kerberosProperties.getKerberosConfigValidator().validate("test", "principal", null); + Assert.assertTrue(result.isValid()); + } + + @Test + public void testWithoutKerberosConfigFile() { + final File file = new File("src/test/resources/krb5.conf"); + + final KerberosProperties kerberosProperties = new KerberosProperties(null); + Assert.assertNotNull(kerberosProperties); + + Assert.assertNull(kerberosProperties.getKerberosConfigFile()); + Assert.assertNotNull(kerberosProperties.getKerberosConfigValidator()); + Assert.assertNotNull(kerberosProperties.getKerberosPrincipal()); + Assert.assertNotNull(kerberosProperties.getKerberosKeytab()); + + final ValidationResult result = kerberosProperties.getKerberosConfigValidator().validate("test", "principal", null); + Assert.assertFalse(result.isValid()); + } + + @Test + public void testValidatePrincipalAndKeytab() { + final ComponentLog log = Mockito.mock(ComponentLog.class); + final Configuration config = new Configuration(); + + // no security enabled in config so doesn't matter what principal and keytab are + List<ValidationResult> results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, null, null, log); + Assert.assertEquals(0, results.size()); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, "principal", null, log); + Assert.assertEquals(0, results.size()); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, "principal", "keytab", log); + Assert.assertEquals(0, results.size()); + + // change the config to have kerberos turned on + config.set("hadoop.security.authentication", "kerberos"); + config.set("hadoop.security.authorization", "true"); + + results = KerberosProperties.validatePrincipalAndKeytab( + "test", config, null, null, log); + Assert.assertEquals(2, results.size()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/resources/krb5.conf ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/resources/krb5.conf b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/resources/krb5.conf new file mode 100644 index 0000000..814d5b2 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/test/resources/krb5.conf @@ -0,0 +1,12 @@ +[libdefaults] + default_realm = EXAMPLE.COM + +[realms] + EXAMPLE.COM = { + kdc = kdc1.example.com + kdc = kdc2.example.com + admin_server = kdc1.example.com + } + +[domain_realm] + .example.com = EXAMPLE.COM \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml new file mode 100644 index 0000000..f7bcb43 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-extension-utils</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-processor-utils</artifactId> + <packaging>jar</packaging> + <description> + This nifi-processor-utils module is designed to capture common patterns + and utilities that can be leveraged by other processors or components to + help promote reuse. These patterns may become framework level features + or may simply be made available through this utility. It is ok for this + module to have dependencies but care should be taken when adding dependencies + as this increases the cost of utilizing this module in various nars. + </description> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-security-utils</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <!-- Other modules using nifi-processor-utils are expected to have this API available, typically through a NAR dependency --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java new file mode 100644 index 0000000..fdbc71f --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.bin; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; + +/** + * Note: {@code Bin} objects are NOT thread safe. If multiple threads access a {@code Bin}, the caller must synchronize + * access. + */ +public class Bin { + private final ProcessSession session; + private final long creationMomentEpochNs; + private final long minimumSizeBytes; + private final long maximumSizeBytes; + + private volatile int minimumEntries = 0; + private volatile int maximumEntries = Integer.MAX_VALUE; + private final String fileCountAttribute; + + final List<FlowFile> binContents = new ArrayList<>(); + long size; + int successiveFailedOfferings = 0; + + /** + * Constructs a new bin + * + * @param session the session + * @param minSizeBytes min bytes + * @param maxSizeBytes max bytes + * @param minEntries min entries + * @param maxEntries max entries + * @param fileCountAttribute num files + * @throws IllegalArgumentException if the min is not less than or equal to the max. + */ + public Bin(final ProcessSession session, final long minSizeBytes, final long maxSizeBytes, final int minEntries, final int maxEntries, final String fileCountAttribute) { + this.session = session; + this.minimumSizeBytes = minSizeBytes; + this.maximumSizeBytes = maxSizeBytes; + this.minimumEntries = minEntries; + this.maximumEntries = maxEntries; + this.fileCountAttribute = fileCountAttribute; + + this.creationMomentEpochNs = System.nanoTime(); + if (minSizeBytes > maxSizeBytes) { + throw new IllegalArgumentException(); + } + } + + public ProcessSession getSession() { + return session; + } + + /** + * Indicates whether the bin has enough items to be considered full. This is based on whether the current size of the bin is greater than the minimum size in bytes and based on having a number of + * successive unsuccessful attempts to add a new item (because it is so close to the max or the size of the objects being attempted do not favor tight packing) + * + * @return true if considered full; false otherwise + */ + public boolean isFull() { + return (((size >= minimumSizeBytes) && binContents.size() >= minimumEntries) && (successiveFailedOfferings > 5)) + || (size >= maximumSizeBytes) || (binContents.size() >= maximumEntries); + } + + /** + * Indicates enough size exists to meet the minimum requirements + * + * @return true if full enough + */ + public boolean isFullEnough() { + return isFull() || (size >= minimumSizeBytes && (binContents.size() >= minimumEntries)); + } + + /** + * Determines if this bin is older than the time specified. + * + * @param duration duration + * @param unit unit + * @return true if this bin is older than the length of time given; false otherwise + */ + public boolean isOlderThan(final int duration, final TimeUnit unit) { + final long ageInNanos = System.nanoTime() - creationMomentEpochNs; + return ageInNanos > TimeUnit.NANOSECONDS.convert(duration, unit); + } + + /** + * Determines if this bin is older than the specified bin + * + * @param other other bin + * @return true if this is older than given bin + */ + public boolean isOlderThan(final Bin other) { + return creationMomentEpochNs < other.creationMomentEpochNs; + } + + /** + * If this bin has enough room for the size of the given flow file then it is added otherwise it is not + * + * @param flowFile flowfile to offer + * @param session the ProcessSession to which the FlowFile belongs + * @return true if added; false otherwise + */ + public boolean offer(final FlowFile flowFile, final ProcessSession session) { + if (((size + flowFile.getSize()) > maximumSizeBytes) || (binContents.size() >= maximumEntries)) { + successiveFailedOfferings++; + return false; + } + + if (fileCountAttribute != null) { + final String countValue = flowFile.getAttribute(fileCountAttribute); + final Integer count = toInteger(countValue); + if (count != null) { + int currentMaxEntries = this.maximumEntries; + this.maximumEntries = Math.min(count, currentMaxEntries); + this.minimumEntries = currentMaxEntries; + } + } + + size += flowFile.getSize(); + + session.migrate(getSession(), Collections.singleton(flowFile)); + binContents.add(flowFile); + successiveFailedOfferings = 0; + return true; + } + + private static final Pattern intPattern = Pattern.compile("\\d+"); + + public Integer toInteger(final String value) { + if (value == null) { + return null; + } + if (!intPattern.matcher(value).matches()) { + return null; + } + + try { + return Integer.parseInt(value); + } catch (final Exception e) { + return null; + } + } + + /** + * @return the underlying list of flow files within this bin + */ + public List<FlowFile> getContents() { + return binContents; + } + + public long getBinAge() { + final long ageInNanos = System.nanoTime() - creationMomentEpochNs; + return TimeUnit.MILLISECONDS.convert(ageInNanos, TimeUnit.NANOSECONDS); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java new file mode 100644 index 0000000..67e37c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.bin; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +/** + * Base class for file-binning processors. + * + */ +public abstract class BinFiles extends AbstractSessionFactoryProcessor { + + public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + .name("Minimum Group Size") + .description("The minimum size of for the bundle") + .required(true) + .defaultValue("0 B") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + .name("Maximum Group Size") + .description("The maximum size for the bundle. If not specified, there is no maximum.") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MIN_ENTRIES = new PropertyDescriptor.Builder() + .name("Minimum Number of Entries") + .description("The minimum number of files to include in a bundle") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder() + .name("Maximum Number of Entries") + .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.") + .defaultValue("1000") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder() + .name("Maximum number of Bins") + .description("Specifies the maximum number of bins that can be held in memory at any one time") + .defaultValue("5") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder() + .name("Max Bin Age") + .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> " + + "where <duration> is a positive integer and time unit is one of seconds, minutes, hours") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(1, TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS)) + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The FlowFiles that were used to create the bundle") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure") + .build(); + + private final BinManager binManager = new BinManager(); + private final Queue<Bin> readyBins = new LinkedBlockingQueue<>(); + + @OnStopped + public final void resetState() { + binManager.purge(); + + Bin bin; + while ((bin = readyBins.poll()) != null) { + bin.getSession().rollback(); + } + } + + /** + * Allows general pre-processing of a flow file before it is offered to a bin. This is called before getGroupId(). + * + * @param context context + * @param session session + * @param flowFile flowFile + * @return The flow file, possibly altered + */ + protected abstract FlowFile preprocessFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile); + + /** + * Returns a group ID representing a bin. This allows flow files to be binned into like groups. + * + * @param context context + * @param flowFile flowFile + * @return The appropriate group ID + */ + protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile); + + /** + * Performs any additional setup of the bin manager. Called during the OnScheduled phase. + * + * @param binManager The bin manager + * @param context context + */ + protected abstract void setUpBinManager(BinManager binManager, ProcessContext context); + + /** + * Processes a single bin. Implementing class is responsible for committing each session + * + * @param unmodifiableBin A reference to a single bin of flow files + * @param context The context + * @return <code>true</code> if the input bin was already committed. E.g., in case of a failure, the implementation + * may choose to transfer all binned files to Failure and commit their sessions. If + * false, the processBins() method will transfer the files to Original and commit the sessions + * + * @throws ProcessException if any problem arises while processing a bin of FlowFiles. All flow files in the bin + * will be transferred to failure and the ProcessSession provided by the 'session' + * argument rolled back + */ + protected abstract boolean processBin(Bin unmodifiableBin, ProcessContext context) throws ProcessException; + + /** + * Allows additional custom validation to be done. This will be called from the parent's customValidation method. + * + * @param context The context + * @return Validation results indicating problems + */ + protected Collection<ValidationResult> additionalCustomValidation(final ValidationContext context) { + return new ArrayList<>(); + } + + @Override + public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final int totalBinCount = binManager.getBinCount() + readyBins.size(); + final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger(); + final int flowFilesBinned; + + if (totalBinCount < maxBinCount) { + flowFilesBinned = binFlowFiles(context, sessionFactory); + getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned}); + } else { + flowFilesBinned = 0; + getLogger().debug("Will not bin any FlowFiles because {} bins already exist;" + + "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount}); + } + + if (!isScheduled()) { + return; + } + + final int binsMigrated = migrateBins(context); + final int binsProcessed = processBins(context); + //If we accomplished nothing then let's yield + if (flowFilesBinned == 0 && binsMigrated == 0 && binsProcessed == 0) { + context.yield(); + } + } + + private int migrateBins(final ProcessContext context) { + int added = 0; + for (final Bin bin : binManager.removeReadyBins(true)) { + this.readyBins.add(bin); + added++; + } + + // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do + // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the + // bins. So we may as well expire it now. + if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) { + final Bin bin = binManager.removeOldestBin(); + if (bin != null) { + added++; + this.readyBins.add(bin); + } + } + return added; + } + + private int processBins(final ProcessContext context) { + final Bin bin = readyBins.poll(); + if (bin == null) { + return 0; + } + + final List<Bin> bins = new ArrayList<>(); + bins.add(bin); + + final ComponentLog logger = getLogger(); + + boolean binAlreadyCommitted = false; + try { + binAlreadyCommitted = this.processBin(bin, context); + } catch (final ProcessException e) { + logger.error("Failed to process bundle of {} files due to {}", new Object[] {bin.getContents().size(), e}); + + final ProcessSession binSession = bin.getSession(); + for (final FlowFile flowFile : bin.getContents()) { + binSession.transfer(flowFile, REL_FAILURE); + } + binSession.commit(); + return 1; + } catch (final Exception e) { + logger.error("Failed to process bundle of {} files due to {}; rolling back sessions", new Object[] {bin.getContents().size(), e}); + + bin.getSession().rollback(); + return 1; + } + + // If this bin's session has been committed, move on. + if (!binAlreadyCommitted) { + final ProcessSession binSession = bin.getSession(); + binSession.transfer(bin.getContents(), REL_ORIGINAL); + binSession.commit(); + } + + return 1; + } + + private int binFlowFiles(final ProcessContext context, final ProcessSessionFactory sessionFactory) { + int flowFilesBinned = 0; + while (binManager.getBinCount() <= context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) { + if (!isScheduled()) { + break; + } + + final ProcessSession session = sessionFactory.createSession(); + final List<FlowFile> flowFiles = session.get(1000); + if (flowFiles.isEmpty()) { + break; + } + + final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>(); + for (FlowFile flowFile : flowFiles) { + flowFile = this.preprocessFlowFile(context, session, flowFile); + final String groupingIdentifier = getGroupId(context, flowFile); + flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); + } + + for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) { + final Set<FlowFile> unbinned = binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory); + for (final FlowFile flowFile : unbinned) { + Bin bin = new Bin(sessionFactory.createSession(), 0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); + bin.offer(flowFile, session); + this.readyBins.add(bin); + } + } + } + + return flowFilesBinned; + } + + @OnScheduled + public final void onScheduled(final ProcessContext context) throws IOException { + binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue()); + + if (context.getProperty(MAX_BIN_AGE).isSet()) { + binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue()); + } else { + binManager.setMaxBinAge(Integer.MAX_VALUE); + } + + if (context.getProperty(MAX_SIZE).isSet()) { + binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue()); + } else { + binManager.setMaximumSize(Long.MAX_VALUE); + } + + binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger()); + + if (context.getProperty(MAX_ENTRIES).isSet()) { + binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue()); + } else { + binManager.setMaximumEntries(Integer.MAX_VALUE); + } + + this.setUpBinManager(binManager, context); + } + + @Override + protected final Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> problems = new ArrayList<>(super.customValidate(context)); + + final long minBytes = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue(); + final Double maxBytes = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B); + + if (maxBytes != null && maxBytes.longValue() < minBytes) { + problems.add( + new ValidationResult.Builder() + .subject(MIN_SIZE.getName()) + .input(context.getProperty(MIN_SIZE).getValue()) + .valid(false) + .explanation("Min Size must be less than or equal to Max Size") + .build() + ); + } + + final Long min = context.getProperty(MIN_ENTRIES).asLong(); + final Long max = context.getProperty(MAX_ENTRIES).asLong(); + + if (min != null && max != null) { + if (min > max) { + problems.add( + new ValidationResult.Builder().subject(MIN_ENTRIES.getName()) + .input(context.getProperty(MIN_ENTRIES).getValue()) + .valid(false) + .explanation("Min Entries must be less than or equal to Max Entries") + .build() + ); + } + } + + Collection<ValidationResult> otherProblems = this.additionalCustomValidation(context); + if (otherProblems != null) { + problems.addAll(otherProblems); + } + + return problems; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java new file mode 100644 index 0000000..d6a8567 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.bin; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; + +/** + * This class is thread safe + * + */ +public class BinManager { + + private final AtomicLong minSizeBytes = new AtomicLong(0L); + private final AtomicLong maxSizeBytes = new AtomicLong(Long.MAX_VALUE); + private final AtomicInteger minEntries = new AtomicInteger(0); + private final AtomicInteger maxEntries = new AtomicInteger(Integer.MAX_VALUE); + private final AtomicReference<String> fileCountAttribute = new AtomicReference<>(null); + + private final AtomicInteger maxBinAgeSeconds = new AtomicInteger(Integer.MAX_VALUE); + private final Map<String, List<Bin>> groupBinMap = new HashMap<>(); + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock rLock = rwLock.readLock(); + private final Lock wLock = rwLock.writeLock(); + + private int binCount = 0; // guarded by read/write lock + + public BinManager() { + } + + public void purge() { + wLock.lock(); + try { + for (final List<Bin> binList : groupBinMap.values()) { + for (final Bin bin : binList) { + bin.getSession().rollback(); + } + } + groupBinMap.clear(); + binCount = 0; + } finally { + wLock.unlock(); + } + } + + public void setFileCountAttribute(final String fileCountAttribute) { + this.fileCountAttribute.set(fileCountAttribute); + } + + public void setMinimumEntries(final int minimumEntries) { + this.minEntries.set(minimumEntries); + } + + public void setMaximumEntries(final int maximumEntries) { + this.maxEntries.set(maximumEntries); + } + + public int getBinCount() { + rLock.lock(); + try { + return binCount; + } finally { + rLock.unlock(); + } + } + + public void setMinimumSize(final long numBytes) { + minSizeBytes.set(numBytes); + } + + public void setMaximumSize(final long numBytes) { + maxSizeBytes.set(numBytes); + } + + public void setMaxBinAge(final int seconds) { + maxBinAgeSeconds.set(seconds); + } + + /** + * Adds the given flowFile to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary. + * <p/> + * + * @param groupIdentifier the group to which the flow file belongs; can be null + * @param flowFile the flow file to bin + * @param session the ProcessSession to which the FlowFile belongs + * @param sessionFactory a ProcessSessionFactory that can be used to create a new ProcessSession in order to + * create a new bin if necessary + * @return true if added; false if no bin exists which can fit this item and no bin can be created based on current min/max criteria + */ + public boolean offer(final String groupIdentifier, final FlowFile flowFile, final ProcessSession session, final ProcessSessionFactory sessionFactory) { + final long currentMaxSizeBytes = maxSizeBytes.get(); + if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any new bins (and probably none existing) + return false; + } + wLock.lock(); + try { + final List<Bin> currentBins = groupBinMap.get(groupIdentifier); + if (currentBins == null) { // this is a new group we need to register + final List<Bin> bins = new ArrayList<>(); + final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), + maxEntries.get(), fileCountAttribute.get()); + bins.add(bin); + groupBinMap.put(groupIdentifier, bins); + binCount++; + return bin.offer(flowFile, session); + } else { + for (final Bin bin : currentBins) { + final boolean accepted = bin.offer(flowFile, session); + if (accepted) { + return true; + } + } + + //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one + final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), + maxEntries.get(), fileCountAttribute.get()); + currentBins.add(bin); + binCount++; + return bin.offer(flowFile, session); + } + } finally { + wLock.unlock(); + } + } + + /** + * Adds the given flowFiles to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary. + * <p/> + * + * @param groupIdentifier the group to which the flow file belongs; can be null + * @param flowFiles the flow files to bin + * @param session the ProcessSession to which the FlowFiles belong + * @param sessionFactory a ProcessSessionFactory that can be used to create a new ProcessSession in order to + * create a new bin if necessary + * @return all of the FlowFiles that could not be successfully binned + */ + public Set<FlowFile> offer(final String groupIdentifier, final Collection<FlowFile> flowFiles, final ProcessSession session, final ProcessSessionFactory sessionFactory) { + final long currentMaxSizeBytes = maxSizeBytes.get(); + final Set<FlowFile> unbinned = new HashSet<>(); + + wLock.lock(); + try { + flowFileLoop: for (final FlowFile flowFile : flowFiles) { + if (flowFile.getSize() > currentMaxSizeBytes) { //won't fit into any new bins (and probably none existing) + unbinned.add(flowFile); + continue; + } + + final List<Bin> currentBins = groupBinMap.get(groupIdentifier); + if (currentBins == null) { // this is a new group we need to register + final List<Bin> bins = new ArrayList<>(); + final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), + maxEntries.get(), fileCountAttribute.get()); + bins.add(bin); + groupBinMap.put(groupIdentifier, bins); + binCount++; + + final boolean added = bin.offer(flowFile, session); + if (!added) { + unbinned.add(flowFile); + } + continue; + } else { + for (final Bin bin : currentBins) { + final boolean accepted = bin.offer(flowFile, session); + if (accepted) { + continue flowFileLoop; + } + } + + //if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one + final Bin bin = new Bin(sessionFactory.createSession(), minSizeBytes.get(), currentMaxSizeBytes, minEntries.get(), + maxEntries.get(), fileCountAttribute.get()); + currentBins.add(bin); + binCount++; + final boolean added = bin.offer(flowFile, session); + if (!added) { + unbinned.add(flowFile); + } + + continue; + } + } + } finally { + wLock.unlock(); + } + + return unbinned; + } + + /** + * Finds all bins that are considered full and removes them from the manager. + * <p/> + * @param relaxFullnessConstraint if false will require bins to be full before considered ready; if true bins only have to meet their minimum size criteria or be 'old' and then they'll be + * considered ready + * @return bins that are considered full + */ + public Collection<Bin> removeReadyBins(boolean relaxFullnessConstraint) { + final Map<String, List<Bin>> newGroupMap = new HashMap<>(); + final List<Bin> readyBins = new ArrayList<>(); + + wLock.lock(); + try { + for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) { + final List<Bin> remainingBins = new ArrayList<>(); + for (final Bin bin : group.getValue()) { + if (relaxFullnessConstraint && (bin.isFullEnough() || bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS))) { //relaxed check + readyBins.add(bin); + } else if (!relaxFullnessConstraint && bin.isFull()) { //strict check + readyBins.add(bin); + } else { //it isn't time yet... + remainingBins.add(bin); + } + } + if (!remainingBins.isEmpty()) { + newGroupMap.put(group.getKey(), remainingBins); + } + } + groupBinMap.clear(); + groupBinMap.putAll(newGroupMap); + binCount -= readyBins.size(); + } finally { + wLock.unlock(); + } + return readyBins; + } + + public Bin removeOldestBin() { + wLock.lock(); + try { + Bin oldestBin = null; + String oldestBinGroup = null; + + for (final Map.Entry<String, List<Bin>> group : groupBinMap.entrySet()) { + for (final Bin bin : group.getValue()) { + if (oldestBin == null || bin.isOlderThan(oldestBin)) { + oldestBin = bin; + oldestBinGroup = group.getKey(); + } + } + } + + if (oldestBin == null) { + return null; + } + + binCount--; + final List<Bin> bins = groupBinMap.get(oldestBinGroup); + bins.remove(oldestBin); + if (bins.isEmpty()) { + groupBinMap.remove(oldestBinGroup); + } + return oldestBin; + } finally { + wLock.unlock(); + } + } + + /** + * @return true if any current bins are older than the allowable max + */ + public boolean containsOldBins() { + rLock.lock(); + try { + for (final List<Bin> bins : groupBinMap.values()) { + for (final Bin bin : bins) { + if (bin.isOlderThan(maxBinAgeSeconds.get(), TimeUnit.SECONDS)) { + return true; + } + } + } + } finally { + rLock.unlock(); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java new file mode 100644 index 0000000..9a97671 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.listen.event.Event; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * An abstract processor that extends from AbstractListenEventProcessor and adds common functionality for + * batching events into a single FlowFile. + * + * @param <E> the type of Event + */ +public abstract class AbstractListenEventBatchingProcessor<E extends Event> extends AbstractListenEventProcessor<E> { + + public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Max Batch Size") + .description( + "The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with " + + "the <Message Delimiter> up to this configured maximum number of messages") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1") + .required(true) + .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .displayName("Batching Message Delimiter") + .description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("\\n") + .required(true) + .build(); + + // it is only the array reference that is volatile - not the contents. + protected volatile byte[] messageDemarcatorBytes; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(NETWORK_INTF_NAME); + descriptors.add(PORT); + descriptors.add(RECV_BUFFER_SIZE); + descriptors.add(MAX_MESSAGE_QUEUE_SIZE); + descriptors.add(MAX_SOCKET_BUFFER_SIZE); + descriptors.add(CHARSET); + descriptors.add(MAX_BATCH_SIZE); + descriptors.add(MESSAGE_DELIMITER); + descriptors.addAll(getAdditionalProperties()); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.addAll(getAdditionalRelationships()); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) throws IOException { + super.onScheduled(context); + final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + messageDemarcatorBytes = msgDemarcator.getBytes(charset); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger(); + final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes); + + // if the size is 0 then there was nothing to process so return + // we don't need to yield here because we have a long poll in side of getBatches + if (batches.size() == 0) { + return; + } + + final List<E> allEvents = new ArrayList<>(); + + for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) { + FlowFile flowFile = entry.getValue().getFlowFile(); + final List<E> events = entry.getValue().getEvents(); + + if (flowFile.getSize() == 0L || events.size() == 0) { + session.remove(flowFile); + getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", new Object[] {entry.getKey()}); + continue; + } + + final Map<String,String> attributes = getAttributes(entry.getValue()); + flowFile = session.putAllAttributes(flowFile, attributes); + + getLogger().debug("Transferring {} to success", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + session.adjustCounter("FlowFiles Transferred to Success", 1L, false); + + // the sender and command will be the same for all events based on the batch key + final String transitUri = getTransitUri(entry.getValue()); + session.getProvenanceReporter().receive(flowFile, transitUri); + + allEvents.addAll(events); + } + + // let sub-classes take any additional actions + postProcess(context, session, allEvents); + } + + /** + * Creates the attributes for the FlowFile of the given batch. + * + * @param batch the current batch + * @return the Map of FlowFile attributes + */ + protected abstract Map<String,String> getAttributes(final FlowFileEventBatch batch); + + /** + * Creates the transit uri to be used when reporting a provenance receive event for the given batch. + * + * @param batch the current batch + * @return the transit uri string + */ + protected abstract String getTransitUri(final FlowFileEventBatch batch); + + /** + * Called at the end of onTrigger to allow sub-classes to take post processing action on the events + * + * @param context the current context + * @param session the current session + * @param events the list of all events processed by the current execution of onTrigger + */ + protected void postProcess(ProcessContext context, ProcessSession session, final List<E> events) { + // empty implementation so sub-classes only have to override if necessary + } + + /** + * Batches together up to the batchSize events. Events are grouped together based on a batch key which + * by default is the sender of the event, but can be override by sub-classes. + * + * This method will return when batchSize has been reached, or when no more events are available on the queue. + * + * @param session the current session + * @param totalBatchSize the total number of events to process + * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile + * + * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all + * the batches will be <= batchSize + */ + protected Map<String,FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize, + final byte[] messageDemarcatorBytes) { + + final Map<String,FlowFileEventBatch> batches = new HashMap<>(); + for (int i=0; i < totalBatchSize; i++) { + final E event = getMessage(true, true, session); + if (event == null) { + break; + } + + final String batchKey = getBatchKey(event); + FlowFileEventBatch batch = batches.get(batchKey); + + // if we don't have a batch for this key then create a new one + if (batch == null) { + batch = new FlowFileEventBatch(session.create(), new ArrayList<E>()); + batches.put(batchKey, batch); + } + + // add the current event to the batch + batch.getEvents().add(event); + + // append the event's data to the FlowFile, write the demarcator first if not on the first event + final boolean writeDemarcator = (i > 0); + try { + final byte[] rawMessage = event.getData(); + FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if (writeDemarcator) { + out.write(messageDemarcatorBytes); + } + + out.write(rawMessage); + } + }); + + // update the FlowFile reference in the batch object + batch.setFlowFile(appendedFlowFile); + + } catch (final Exception e) { + getLogger().error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", + new Object[] {e.getMessage()}, e); + errorEvents.offer(event); + break; + } + } + + return batches; + } + + /** + * @param event an event that was pulled off the queue + * + * @return a key to use for batching events together, by default this uses the sender of the + * event, but sub-classes should override this to batch by something else + */ + protected String getBatchKey(final E event) { + return event.getSender(); + } + + /** + * Wrapper to hold a FlowFile and the events that have been appended to it. + */ + protected final class FlowFileEventBatch { + + private FlowFile flowFile; + private List<E> events; + + public FlowFileEventBatch(final FlowFile flowFile, final List<E> events) { + this.flowFile = flowFile; + this.events = events; + } + + public FlowFile getFlowFile() { + return flowFile; + } + + public List<E> getEvents() { + return events; + } + + public void setFlowFile(FlowFile flowFile) { + this.flowFile = flowFile; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java new file mode 100644 index 0000000..43d01b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * An abstract processor to extend from when listening for events over a channel. This processor + * will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher, in a background + * thread which will end up placing events on a queue to polled by the onTrigger method. Sub-classes + * are responsible for providing the dispatcher implementations. + * + * @param <E> the type of events being produced + */ +public abstract class AbstractListenEventProcessor<E extends Event> extends AbstractProcessor { + + + + public static final PropertyDescriptor PORT = new PropertyDescriptor + .Builder().name("Port") + .description("The port to listen on for communication.") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the received data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Receive Buffer Size") + .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " + + "incoming messages.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("65507 B") + .required(true) + .build(); + public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Socket Buffer") + .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " + + "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + + "the data can be read, and incoming data will be dropped.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); + public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder() + .name("Max Size of Message Queue") + .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " + + "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " + + "memory used by the processor.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .required(true) + .build(); + + // Putting these properties here so sub-classes don't have to redefine them, but they are + // not added to the properties by default since not all processors may need them + + public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max Number of TCP Connections") + .description("The maximum number of concurrent TCP connections to accept.") + .addValidator(StandardValidators.createLongValidator(1, 65535, true)) + .defaultValue("2") + .required(true) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Messages received successfully will be sent out this relationship.") + .build(); + + public static final int POLL_TIMEOUT_MS = 20; + + protected Set<Relationship> relationships; + protected List<PropertyDescriptor> descriptors; + + protected volatile int port; + protected volatile Charset charset; + protected volatile ChannelDispatcher dispatcher; + protected volatile BlockingQueue<E> events; + protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(NETWORK_INTF_NAME); + descriptors.add(PORT); + descriptors.add(RECV_BUFFER_SIZE); + descriptors.add(MAX_MESSAGE_QUEUE_SIZE); + descriptors.add(MAX_SOCKET_BUFFER_SIZE); + descriptors.add(CHARSET); + descriptors.addAll(getAdditionalProperties()); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.addAll(getAdditionalRelationships()); + this.relationships = Collections.unmodifiableSet(relationships); + } + + /** + * Override to provide additional relationships for the processor. + * + * @return a list of relationships + */ + protected List<Relationship> getAdditionalRelationships() { + return Collections.EMPTY_LIST; + } + + /** + * Override to provide additional properties for the processor. + * + * @return a list of properties + */ + protected List<PropertyDescriptor> getAdditionalProperties() { + return Collections.EMPTY_LIST; + } + + @Override + public final Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + charset = Charset.forName(context.getProperty(CHARSET).getValue()); + port = context.getProperty(PORT).asInteger(); + events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger()); + + final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue(); + final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + + InetAddress nicIPAddress = null; + if (!StringUtils.isEmpty(nicIPAddressStr)) { + NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr); + nicIPAddress = netIF.getInetAddresses().nextElement(); + } + + // create the dispatcher and call open() to bind to the given port + dispatcher = createDispatcher(context, events); + dispatcher.open(nicIPAddress, port, maxChannelBufferSize); + + // start a thread to run the dispatcher + final Thread readerThread = new Thread(dispatcher); + readerThread.setName(getClass().getName() + " [" + getIdentifier() + "]"); + readerThread.setDaemon(true); + readerThread.start(); + } + + /** + * @param context the ProcessContext to retrieve property values from + * @return a ChannelDispatcher to handle incoming connections + * + * @throws IOException if unable to listen on the requested port + */ + protected abstract ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<E> events) throws IOException; + + // used for testing to access the random port that was selected + public final int getDispatcherPort() { + return dispatcher == null ? 0 : dispatcher.getPort(); + } + + public int getErrorQueueSize() { + return errorEvents.size(); + } + + public int getQueueSize() { + return events == null ? 0 : events.size(); + } + + @OnUnscheduled + public void onUnscheduled() { + if (dispatcher != null) { + dispatcher.close(); + } + } + + /** + * Creates a pool of ByteBuffers with the given size. + * + * @param poolSize the number of buffers to initialize the pool with + * @param bufferSize the size of each buffer + * @return a blocking queue with size equal to poolSize and each buffer equal to bufferSize + */ + protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, final int bufferSize) { + final LinkedBlockingQueue<ByteBuffer> bufferPool = new LinkedBlockingQueue<>(poolSize); + for (int i = 0; i < poolSize; i++) { + bufferPool.offer(ByteBuffer.allocate(bufferSize)); + } + return bufferPool; + } + + /** + * If pollErrorQueue is true, the error queue will be checked first and event will be + * returned from the error queue if available. + * + * If pollErrorQueue is false, or no data is in the error queue, the regular queue is polled. + * + * If longPoll is true, the regular queue will be polled with a short timeout, otherwise it will + * poll with no timeout which will return immediately. + * + * @param longPoll whether or not to poll the main queue with a small timeout + * @param pollErrorQueue whether or not to poll the error queue first + * + * @return an event from one of the queues, or null if none are available + */ + protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) { + E event = null; + if (pollErrorQueue) { + event = errorEvents.poll(); + } + + if (event == null) { + try { + if (longPoll) { + event = events.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } else { + event = events.poll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + if (event != null) { + session.adjustCounter("Messages Received", 1L, false); + } + + return event; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java new file mode 100644 index 0000000..5e4c639 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.AttributeExpression; + +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; + +/** + * Shared properties. + */ +public class ListenerProperties { + + private static final Set<String> interfaceSet = new HashSet<>(); + + static { + try { + final Enumeration<NetworkInterface> interfaceEnum = NetworkInterface.getNetworkInterfaces(); + while (interfaceEnum.hasMoreElements()) { + final NetworkInterface ifc = interfaceEnum.nextElement(); + interfaceSet.add(ifc.getName()); + } + } catch (SocketException e) { + } + } + + public static final PropertyDescriptor NETWORK_INTF_NAME = new PropertyDescriptor.Builder() + .name("Local Network Interface") + .description("The name of a local network interface to be used to restrict listening to a specific LAN.") + .addValidator(new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult result = new ValidationResult.Builder() + .subject("Local Network Interface").valid(true).input(input).build(); + if (interfaceSet.contains(input.toLowerCase())) { + return result; + } + + String message; + String realValue = input; + try { + if (context.isExpressionLanguagePresent(input)) { + AttributeExpression ae = context.newExpressionLanguageCompiler().compile(input); + realValue = ae.evaluate(); + } + + if (interfaceSet.contains(realValue.toLowerCase())) { + return result; + } + + message = realValue + " is not a valid network name. Valid names are " + interfaceSet.toString(); + + } catch (IllegalArgumentException e) { + message = "Not a valid AttributeExpression: " + e.getMessage(); + } + result = new ValidationResult.Builder().subject("Local Network Interface") + .valid(false).input(input).explanation(message).build(); + + return result; + } + }) + .expressionLanguageSupported(true) + .build(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java new file mode 100644 index 0000000..5215a21 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import java.nio.channels.SelectionKey; + +/** + * A ChannelDispatcher that handles channels asynchronously. + */ +public interface AsyncChannelDispatcher extends ChannelDispatcher { + + /** + * Informs the dispatcher that the connection for the given key is complete. + * + * @param key a key that was previously selected + */ + void completeConnection(SelectionKey key); + + /** + * Informs the dispatcher that the connection for the given key can be added back for selection. + * + * @param key a key that was previously selected + */ + void addBackForSelection(SelectionKey key); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java new file mode 100644 index 0000000..444aeb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.listen.dispatcher; + +import java.io.IOException; +import java.net.InetAddress; + +/** + * Dispatches handlers for a given channel. + */ +public interface ChannelDispatcher extends Runnable { + + /** + * Opens the dispatcher listening on the given port and attempts to set the + * OS socket buffer to maxBufferSize. + * + * @param nicAddress the local network interface to listen on, if null will listen on the wildcard address + * which means listening on all local network interfaces + * + * @param port the port to listen on + * + * @param maxBufferSize the size to set the OS socket buffer to + * + * @throws IOException if an error occurred listening on the given port + */ + void open(InetAddress nicAddress, int port, int maxBufferSize) throws IOException; + + /** + * @return the port being listened to + */ + int getPort(); + + /** + * Closes all listeners and stops all handler threads. + */ + void close(); + +}
