Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 149ad130d -> 82174e460


NIFI-446 Adding a Delimiter Strategy to MergeContent


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3cf65261
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3cf65261
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3cf65261

Branch: refs/heads/develop
Commit: 3cf65261aa141ef116b762d821cc5dbe26bb2bdd
Parents: 5273a63
Author: bbende <[email protected]>
Authored: Tue May 19 21:11:50 2015 -0400
Committer: bbende <[email protected]>
Committed: Tue May 19 21:12:24 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/MergeContent.java  | 92 +++++++++++++++++--
 .../processors/standard/TestMergeContent.java   | 97 ++++++++++++++++++++
 2 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cf65261/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index b11dee3..2883a75 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -23,6 +23,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -49,6 +50,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
@@ -130,6 +133,11 @@ public class MergeContent extends BinFiles {
             + "have the attributes <fragment.identifier>, <fragment.count>, 
and <fragment.index> or alternatively (for backward compatibility "
             + "purposes) <segment.identifier>, <segment.count>, and 
<segment.index>");
 
+    public static final AllowableValue DELIMITER_STRATEGY_FILENAME = new 
AllowableValue(
+            "Filename", "Filename", "The values of Header, Footer, and 
Demarcator will be retrieved from the contents of a file");
+    public static final AllowableValue DELIMITER_STRATEGY_TEXT = new 
AllowableValue(
+            "Text", "Text", "The values of Header, Footer, and Demarcator will 
be specified as property values");
+
     public static final String MERGE_FORMAT_TAR_VALUE = "TAR";
     public static final String MERGE_FORMAT_ZIP_VALUE = "ZIP";
     public static final String MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = 
"FlowFile Stream, v3";
@@ -210,26 +218,40 @@ public class MergeContent extends BinFiles {
             .defaultValue(null)
             .build();
 
+    public static final PropertyDescriptor DELIMITER_STRATEGY = new 
PropertyDescriptor.Builder()
+            .required(true)
+            .name("Delimiter Strategy")
+            .description("Determines if Header, Footer, and Demarcator should 
point to files containing the respective content, or if "
+                    + "the values of the properties should be used as the 
content.")
+            .allowableValues(DELIMITER_STRATEGY_FILENAME, 
DELIMITER_STRATEGY_TEXT)
+            .defaultValue(DELIMITER_STRATEGY_FILENAME.getValue())
+            .build();
     public static final PropertyDescriptor HEADER = new 
PropertyDescriptor.Builder()
             .name("Header File")
+            .displayName("Header")
             .description("Filename specifying the header to use. If not 
specified, no header is supplied. This property is valid only when using the "
                     + "binary-concatenation merge strategy; otherwise, it is 
ignored.")
             .required(false)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor FOOTER = new 
PropertyDescriptor.Builder()
             .name("Footer File")
+            .displayName("Footer")
             .description("Filename specifying the footer to use. If not 
specified, no footer is supplied. This property is valid only when using the "
                     + "binary-concatenation merge strategy; otherwise, it is 
ignored.")
             .required(false)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor DEMARCATOR = new 
PropertyDescriptor.Builder()
             .name("Demarcator File")
+            .displayName("Demarcator")
             .description("Filename specifying the demarcator to use. If not 
specified, no demarcator is supplied. This property is valid only when "
                     + "using the binary-concatenation merge strategy; 
otherwise, it is ignored.")
             .required(false)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor COMPRESSION_LEVEL = new 
PropertyDescriptor.Builder()
             .name("Compression Level")
@@ -274,6 +296,7 @@ public class MergeContent extends BinFiles {
         descriptors.add(MAX_SIZE);
         descriptors.add(MAX_BIN_AGE);
         descriptors.add(MAX_BIN_COUNT);
+        descriptors.add(DELIMITER_STRATEGY);
         descriptors.add(HEADER);
         descriptors.add(FOOTER);
         descriptors.add(DEMARCATOR);
@@ -282,6 +305,30 @@ public class MergeContent extends BinFiles {
         return descriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> 
additionalCustomValidation(ValidationContext context) {
+        Collection<ValidationResult> results = new ArrayList<>();
+
+        final String delimiterStrategy = 
context.getProperty(DELIMITER_STRATEGY).getValue();
+        if(DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategy)) {
+            final String headerValue = context.getProperty(HEADER).getValue();
+            if (headerValue != null) {
+                
results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(HEADER.getName(), 
headerValue, context));
+            }
+
+            final String footerValue = context.getProperty(FOOTER).getValue();
+            if (footerValue != null) {
+                
results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(FOOTER.getName(), 
footerValue, context));
+            }
+
+            final String demarcatorValue = 
context.getProperty(DEMARCATOR).getValue();
+            if (demarcatorValue != null) {
+                
results.add(StandardValidators.FILE_EXISTS_VALIDATOR.validate(DEMARCATOR.getName(),
 demarcatorValue, context));
+            }
+        }
+        return results;
+    }
+
     private byte[] readContent(final String filename) throws IOException {
         return Files.readAllBytes(Paths.get(filename));
     }
@@ -479,7 +526,7 @@ public class MergeContent extends BinFiles {
             bundle = session.write(bundle, new OutputStreamCallback() {
                 @Override
                 public void process(final OutputStream out) throws IOException 
{
-                    final byte[] header = getDescriptorFileContent(context, 
wrappers, HEADER);
+                    final byte[] header = getDelimiterContent(context, 
wrappers, HEADER);
                     if (header != null) {
                         out.write(header);
                     }
@@ -496,7 +543,7 @@ public class MergeContent extends BinFiles {
                         });
 
                         if (itr.hasNext()) {
-                            final byte[] demarcator = 
getDescriptorFileContent(context, wrappers, DEMARCATOR);
+                            final byte[] demarcator = 
getDelimiterContent(context, wrappers, DEMARCATOR);
                             if (demarcator != null) {
                                 out.write(demarcator);
                             }
@@ -513,7 +560,7 @@ public class MergeContent extends BinFiles {
                         }
                     }
 
-                    final byte[] footer = getDescriptorFileContent(context, 
wrappers, FOOTER);
+                    final byte[] footer = getDelimiterContent(context, 
wrappers, FOOTER);
                     if (footer != null) {
                         out.write(footer);
                     }
@@ -529,12 +576,22 @@ public class MergeContent extends BinFiles {
             return bundle;
         }
 
-        private byte[] getDescriptorFileContent(final ProcessContext context, 
final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor 
descriptor)
+        private byte[] getDelimiterContent(final ProcessContext context, final 
List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor descriptor)
+                throws IOException {
+            final String delimiterStrategyValue = 
context.getProperty(DELIMITER_STRATEGY).getValue();
+            if (DELIMITER_STRATEGY_FILENAME.equals(delimiterStrategyValue)) {
+                return getDelimiterFileContent(context, wrappers, descriptor);
+            } else {
+                return getDelimiterTextContent(context, wrappers, descriptor);
+            }
+        }
+
+        private byte[] getDelimiterFileContent(final ProcessContext context, 
final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor 
descriptor)
                 throws IOException {
             byte[] property = null;
-            final String descriptorFile = 
context.getProperty(descriptor).getValue();
-            if (descriptorFile != null && wrappers != null && wrappers.size() 
> 0) {
-                final String content = new String(readContent(descriptorFile));
+            final String descriptorValue = 
context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
+            if (descriptorValue != null && wrappers != null && wrappers.size() 
> 0) {
+                final String content = new 
String(readContent(descriptorValue));
                 final FlowFileSessionWrapper wrapper = wrappers.get(0);
                 if (wrapper != null && content != null) {
                     final FlowFile flowFile = wrapper.getFlowFile();
@@ -547,6 +604,21 @@ public class MergeContent extends BinFiles {
             return property;
         }
 
+        private byte[] getDelimiterTextContent(final ProcessContext context, 
final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor 
descriptor)
+                throws IOException {
+            byte[] property = null;
+            if (wrappers != null && wrappers.size() > 0) {
+                final FlowFileSessionWrapper wrapper = wrappers.get(0);
+                if (wrapper != null) {
+                    final FlowFile flowFile = wrapper.getFlowFile();
+                    if (flowFile != null) {
+                        property = 
context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue().getBytes();
+                    }
+                }
+            }
+            return property;
+        }
+
         @Override
         public String getMergedContentType() {
             return mimeType;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3cf65261/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index a657453..d2952d2 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -33,9 +34,12 @@ import java.util.zip.ZipInputStream;
 import org.apache.commons.compress.archivers.ArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.io.IOUtils;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -70,6 +74,99 @@ public class TestMergeContent {
     }
 
     @Test
+    public void testSimpleBinaryConcatWithTextDelimiters() throws IOException, 
InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_CONCAT);
+        runner.setProperty(MergeContent.DELIMITER_STRATEGY, 
MergeContent.DELIMITER_STRATEGY_TEXT);
+        runner.setProperty(MergeContent.HEADER, "@");
+        runner.setProperty(MergeContent.DEMARCATOR, "#");
+        runner.setProperty(MergeContent.FOOTER, "$");
+
+        createFlowFiles(runner);
+        runner.run();
+
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertContentEquals("@Hello#, #World!$".getBytes("UTF-8"));
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/plain-text");
+    }
+
+    @Test
+    public void testSimpleBinaryConcatWithFileDelimiters() throws IOException, 
InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_CONCAT);
+        runner.setProperty(MergeContent.DELIMITER_STRATEGY, 
MergeContent.DELIMITER_STRATEGY_FILENAME);
+        runner.setProperty(MergeContent.HEADER, 
getClass().getResource("/TestMergeContent/head").getPath());
+        runner.setProperty(MergeContent.DEMARCATOR, 
getClass().getResource("/TestMergeContent/demarcate").getPath());
+        runner.setProperty(MergeContent.FOOTER, 
getClass().getResource("/TestMergeContent/foot").getPath());
+
+        createFlowFiles(runner);
+        runner.run();
+
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
+        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
+
+        final MockFlowFile bundle = 
runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
+        bundle.assertContentEquals("(|)Hello***, 
***World!___".getBytes("UTF-8"));
+        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"application/plain-text");
+    }
+
+    @Test
+    public void testTextDelimitersValidation() throws IOException, 
InterruptedException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_CONCAT);
+        runner.setProperty(MergeContent.DELIMITER_STRATEGY, 
MergeContent.DELIMITER_STRATEGY_TEXT);
+        runner.setProperty(MergeContent.HEADER, "");
+        runner.setProperty(MergeContent.DEMARCATOR, "");
+        runner.setProperty(MergeContent.FOOTER, "");
+
+        Collection<ValidationResult> results = new HashSet<>();
+        ProcessContext context = runner.getProcessContext();
+        if (context instanceof MockProcessContext) {
+            MockProcessContext mockContext = (MockProcessContext)context;
+            results = mockContext.validate();
+        }
+
+        Assert.assertEquals(3, results.size());
+        for (ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString().contains("cannot be empty"));
+        }
+    }
+
+    @Test
+    public void testFileDelimitersValidation() throws IOException, 
InterruptedException {
+        final String doesNotExistFile = 
"src/test/resources/TestMergeContent/does_not_exist";
+        final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
+        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
+        runner.setProperty(MergeContent.MERGE_FORMAT, 
MergeContent.MERGE_FORMAT_CONCAT);
+        runner.setProperty(MergeContent.DELIMITER_STRATEGY, 
MergeContent.DELIMITER_STRATEGY_FILENAME);
+        runner.setProperty(MergeContent.HEADER, doesNotExistFile);
+        runner.setProperty(MergeContent.DEMARCATOR, doesNotExistFile);
+        runner.setProperty(MergeContent.FOOTER, doesNotExistFile);
+
+        Collection<ValidationResult> results = new HashSet<>();
+        ProcessContext context = runner.getProcessContext();
+        if (context instanceof MockProcessContext) {
+            MockProcessContext mockContext = (MockProcessContext)context;
+            results = mockContext.validate();
+        }
+
+        Assert.assertEquals(3, results.size());
+        for (ValidationResult vr : results) {
+            Assert.assertTrue(vr.toString().contains("is invalid because File 
" + doesNotExistFile + " does not exist"));
+        }
+    }
+
+    @Test
     public void testMimeTypeIsOctetStreamIfConflictingWithBinaryConcat() 
throws IOException, InterruptedException {
         final TestRunner runner = TestRunners.newTestRunner(new 
MergeContent());
         runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");

Reply via email to