NIFI-559: Initial implementation of DuplicateFlowFile
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/21c5c48c Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/21c5c48c Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/21c5c48c Branch: refs/heads/develop Commit: 21c5c48cabe8631a173fb2513f7e224bac7a59ab Parents: e7954cf Author: Mark Payne <[email protected]> Authored: Thu Apr 30 13:13:22 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Thu Apr 30 13:13:22 2015 -0400 ---------------------------------------------------------------------- .../processors/standard/DuplicateFlowFile.java | 81 ++++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestDuplicateFlowFile.java | 36 +++++++++ 3 files changed, 118 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21c5c48c/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java new file mode 100644 index 0000000..7400821 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +@EventDriven +@SupportsBatching +@Tags({"test", "load", "duplicate"}) +@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile") +public class DuplicateFlowFile extends AbstractProcessor { + + static final PropertyDescriptor NUM_COPIES = new PropertyDescriptor.Builder() + .name("Number of Copies") + .description("Specifies how many copies of each incoming FlowFile will be made") + .required(true) + .expressionLanguageSupported(false) + .defaultValue("100") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The original FlowFile and all copies will be sent to this relationship") + .build(); + + @Override + public Set<Relationship> getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return Collections.singletonList(NUM_COPIES); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) { + final FlowFile copy = session.clone(flowFile); + session.transfer(copy, REL_SUCCESS); + } + + session.transfer(flowFile, REL_SUCCESS); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21c5c48c/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 7fbd781..17339bc 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -18,6 +18,7 @@ org.apache.nifi.processors.standard.ControlRate org.apache.nifi.processors.standard.ConvertCharacterSet org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DistributeLoad +org.apache.nifi.processors.standard.DuplicateFlowFile org.apache.nifi.processors.standard.EncryptContent org.apache.nifi.processors.standard.EvaluateJsonPath org.apache.nifi.processors.standard.EvaluateRegularExpression http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/21c5c48c/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java new file mode 100644 index 0000000..82fee1b --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestDuplicateFlowFile { + + @Test + public void test() { + final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class); + runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100"); + + runner.enqueue("hello".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 101); + } + +}
