http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
index 0000000,6db2757..70ac5ac
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutFile.java
@@@ -1,0 -1,367 +1,367 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.nio.file.attribute.PosixFileAttributeView;
+ import java.nio.file.attribute.PosixFilePermissions;
+ import java.nio.file.attribute.UserPrincipalLookupService;
+ import java.text.DateFormat;
+ import java.text.SimpleDateFormat;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.Date;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Locale;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ 
+ @SupportsBatching
+ @Tags({"put", "local", "copy", "archive", "files", "filesystem"})
+ @CapabilityDescription("Writes the contents of a FlowFile to the local file 
system")
+ public class PutFile extends AbstractProcessor {
+ 
+     public static final String REPLACE_RESOLUTION = "replace";
+     public static final String IGNORE_RESOLUTION = "ignore";
+     public static final String FAIL_RESOLUTION = "fail";
+ 
+     public static final String FILE_MODIFY_DATE_ATTRIBUTE = 
"file.lastModifiedTime";
+     public static final String FILE_MODIFY_DATE_ATTR_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ssZ";
+ 
+     public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+             .name("Directory")
+             .description("The directory to which files should be written. You 
may use expression language such as /aa/bb/${path}")
+             .required(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor MAX_DESTINATION_FILES = new 
PropertyDescriptor.Builder()
+             .name("Maximum File Count")
+             .description("Specifies the maximum number of files that can 
exist in the output directory")
+             .required(false)
+             .addValidator(StandardValidators.INTEGER_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
+             .name("Conflict Resolution Strategy")
+             .description("Indicates what should happen when a file with the 
same name already exists in the output directory")
+             .required(true)
+             .defaultValue(FAIL_RESOLUTION)
+             .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, 
FAIL_RESOLUTION)
+             .build();
+     public static final PropertyDescriptor CHANGE_LAST_MODIFIED_TIME = new 
PropertyDescriptor.Builder()
+             .name("Last Modified Time")
+             .description("Sets the lastModifiedTime on the output file to the 
value of this attribute.  Format must be yyyy-MM-dd'T'HH:mm:ssZ.  You may also 
use expression language such as ${file.lastModifiedTime}.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CHANGE_PERMISSIONS = new 
PropertyDescriptor.Builder()
+             .name("Permissions")
+             .description("Sets the permissions on the output file to the 
value of this attribute.  Format must be either UNIX rwxrwxrwx with a - in 
place of denied permissions (e.g. rw-r--r--) or an octal number (e.g. 644).  
You may also use expression language such as ${file.permissions}.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CHANGE_OWNER = new 
PropertyDescriptor.Builder()
+             .name("Owner")
+             .description("Sets the owner on the output file to the value of 
this attribute.  You may also use expression language such as ${file.owner}.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CHANGE_GROUP = new 
PropertyDescriptor.Builder()
+             .name("Group")
+             .description("Sets the group on the output file to the value of 
this attribute.  You may also use expression language such as ${file.group}.")
+             .required(false)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CREATE_DIRS = new 
PropertyDescriptor.Builder()
+             .name("Create Missing Directories")
+             .description("If true, then missing destination directories will 
be created. If false, flowfiles are penalized and sent to failure.")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("true")
+             .build();
+ 
+     public static final int MAX_FILE_LOCK_ATTEMPTS = 10;
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("Files that have been 
successfully written to the output directory are transferred to this 
relationship").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("Files that could not be 
written to the output directory for some reason are transferred to this 
relationship").build();
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         // relationships
+         final Set<Relationship> procRels = new HashSet<>();
+         procRels.add(REL_SUCCESS);
+         procRels.add(REL_FAILURE);
+         relationships = Collections.unmodifiableSet(procRels);
+ 
+         // descriptors
+         final List<PropertyDescriptor> supDescriptors = new ArrayList<>();
+         supDescriptors.add(DIRECTORY);
+         supDescriptors.add(CONFLICT_RESOLUTION);
+         supDescriptors.add(CREATE_DIRS);
+         supDescriptors.add(MAX_DESTINATION_FILES);
+         supDescriptors.add(CHANGE_LAST_MODIFIED_TIME);
+         supDescriptors.add(CHANGE_PERMISSIONS);
+         supDescriptors.add(CHANGE_OWNER);
+         supDescriptors.add(CHANGE_GROUP);
+         properties = Collections.unmodifiableList(supDescriptors);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         FlowFile flowFile = session.get();
+         if (flowFile == null) {
+             return;
+         }
+ 
+         final StopWatch stopWatch = new StopWatch(true);
+         final Path configuredRootDirPath = 
Paths.get(context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue());
+         final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+         final Integer maxDestinationFiles = 
context.getProperty(MAX_DESTINATION_FILES).asInteger();
+         final ProcessorLog logger = getLogger();
+ 
+         Path tempDotCopyFile = null;
+         try {
+             final Path rootDirPath = configuredRootDirPath;
+             final Path tempCopyFile = rootDirPath.resolve("." + 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+             final Path copyFile = 
rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ 
+             if (!Files.exists(rootDirPath)) {
+                 if (context.getProperty(CREATE_DIRS).asBoolean()) {
+                     Files.createDirectories(rootDirPath);
+                 } else {
+                     flowFile = session.penalize(flowFile);
+                     session.transfer(flowFile, REL_FAILURE);
+                     logger.error("Penalizing {} and routing to 'failure' 
because the output directory {} does not exist and Processor is configured not 
to create missing directories", new Object[]{flowFile, rootDirPath});
+                     return;
+                 }
+             }
+ 
+             final Path dotCopyFile = tempCopyFile;
+             tempDotCopyFile = dotCopyFile;
+             Path finalCopyFile = copyFile;
+ 
+             final Path finalCopyFileDir = finalCopyFile.getParent();
+             if (Files.exists(finalCopyFileDir) && maxDestinationFiles != 
null) { // check if too many files already
+                 final int numFiles = finalCopyFileDir.toFile().list().length;
+ 
+                 if (numFiles >= maxDestinationFiles) {
+                     flowFile = session.penalize(flowFile);
+                     logger.info("Penalizing {} and routing to 'failure' 
because the output directory {} has {} files, which exceeds the configured 
maximum number of files", new Object[]{flowFile, finalCopyFileDir, numFiles});
+                     session.transfer(flowFile, REL_FAILURE);
+                     return;
+                 }
+             }
+ 
+             if (Files.exists(finalCopyFile)) {
+                 switch (conflictResponse) {
+                     case REPLACE_RESOLUTION:
+                         Files.delete(finalCopyFile);
+                         logger.info("Deleted {} as configured in order to 
replace with the contents of {}", new Object[]{finalCopyFile, flowFile});
+                         break;
+                     case IGNORE_RESOLUTION:
+                         session.transfer(flowFile, REL_SUCCESS);
+                         logger.info("Transferring {} to success because file 
with same name already exists", new Object[]{flowFile});
+                         return;
+                     case FAIL_RESOLUTION:
+                         flowFile = session.penalize(flowFile);
+                         logger.info("Penalizing {} and routing to failure as 
configured because file with the same name already exists", new 
Object[]{flowFile});
+                         session.transfer(flowFile, REL_FAILURE);
+                         return;
+                     default:
+                         break;
+                 }
+             }
+ 
+             session.exportTo(flowFile, dotCopyFile, false);
+ 
+             final String lastModifiedTime = 
context.getProperty(CHANGE_LAST_MODIFIED_TIME).evaluateAttributeExpressions(flowFile).getValue();
+             if (lastModifiedTime != null && 
!lastModifiedTime.trim().isEmpty()) {
+                 try {
+                     final DateFormat formatter = new 
SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US);
+                     final Date fileModifyTime = 
formatter.parse(lastModifiedTime);
+                     
dotCopyFile.toFile().setLastModified(fileModifyTime.getTime());
+                 } catch (Exception e) {
+                     logger.warn("Could not set file lastModifiedTime to {} 
because {}", new Object[]{lastModifiedTime, e});
+                 }
+             }
+ 
+             final String permissions = 
context.getProperty(CHANGE_PERMISSIONS).evaluateAttributeExpressions(flowFile).getValue();
+             if (permissions != null && !permissions.trim().isEmpty()) {
+                 try {
+                     String perms = stringPermissions(permissions);
+                     if (!perms.isEmpty()) {
+                         Files.setPosixFilePermissions(dotCopyFile, 
PosixFilePermissions.fromString(perms));
+                     }
+                 } catch (Exception e) {
+                     logger.warn("Could not set file permissions to {} because 
{}", new Object[]{permissions, e});
+                 }
+             }
+ 
+             final String owner = 
context.getProperty(CHANGE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
+             if (owner != null && !owner.trim().isEmpty()) {
+                 try {
+                     UserPrincipalLookupService lookupService = 
dotCopyFile.getFileSystem().getUserPrincipalLookupService();
+                     Files.setOwner(dotCopyFile, 
lookupService.lookupPrincipalByName(owner));
+                 } catch (Exception e) {
+                     logger.warn("Could not set file owner to {} because {}", 
new Object[]{owner, e});
+                 }
+             }
+ 
+             final String group = 
context.getProperty(CHANGE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
+             if (group != null && !group.trim().isEmpty()) {
+                 try {
+                     UserPrincipalLookupService lookupService = 
dotCopyFile.getFileSystem().getUserPrincipalLookupService();
+                     PosixFileAttributeView view = 
Files.getFileAttributeView(dotCopyFile, PosixFileAttributeView.class);
+                     
view.setGroup(lookupService.lookupPrincipalByGroupName(group));
+                 } catch (Exception e) {
+                     logger.warn("Could not set file group to {} because {}", 
new Object[]{group, e});
+                 }
+             }
+ 
+             boolean renamed = false;
+             for (int i = 0; i < 10; i++) { // try rename up to 10 times.
+                 if (dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) {
+                     renamed = true;
+                     break;// rename was successful
+                 }
+                 Thread.sleep(100L);// try waiting a few ms to let whatever 
might cause rename failure to resolve
+             }
+ 
+             if (!renamed) {
+                 if (Files.exists(dotCopyFile) && 
dotCopyFile.toFile().delete()) {
+                     logger.debug("Deleted dot copy file {}", new 
Object[]{dotCopyFile});
+                 }
+                 throw new ProcessException("Could not rename: " + 
dotCopyFile);
+             } else {
+                 logger.info("Produced copy of {} at location {}", new 
Object[]{flowFile, finalCopyFile});
+             }
+ 
+             session.getProvenanceReporter().send(flowFile, 
finalCopyFile.toFile().toURI().toString(), 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             session.transfer(flowFile, REL_SUCCESS);
+         } catch (final Throwable t) {
+             if (tempDotCopyFile != null) {
+                 try {
+                     Files.deleteIfExists(tempDotCopyFile);
+                 } catch (final Exception e) {
+                     logger.error("Unable to remove temporary file {} due to 
{}", new Object[]{tempDotCopyFile, e});
+                 }
+             }
+ 
+             flowFile = session.penalize(flowFile);
+             logger.error("Penalizing {} and transferring to failure due to 
{}", new Object[]{flowFile, t});
+             session.transfer(flowFile, REL_FAILURE);
+         }
+     }
+ 
+     protected String stringPermissions(String perms) {
+         String permissions = "";
+         final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");
+         final Pattern numPattern = Pattern.compile("\\d+");
+         if (rwxPattern.matcher(perms).matches()) {
+             permissions = perms;
+         } else if (numPattern.matcher(perms).matches()) {
+             try {
+                 int number = Integer.parseInt(perms, 8);
+                 StringBuilder permBuilder = new StringBuilder();
+                 if ((number & 0x100) > 0) {
+                     permBuilder.append('r');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x80) > 0) {
+                     permBuilder.append('w');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x40) > 0) {
+                     permBuilder.append('x');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x20) > 0) {
+                     permBuilder.append('r');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x10) > 0) {
+                     permBuilder.append('w');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x8) > 0) {
+                     permBuilder.append('x');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x4) > 0) {
+                     permBuilder.append('r');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x2) > 0) {
+                     permBuilder.append('w');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 if ((number & 0x8) > 0) {
+                     permBuilder.append('x');
+                 } else {
+                     permBuilder.append('-');
+                 }
+                 permissions = permBuilder.toString();
+             } catch (NumberFormatException ignore) {
+             }
+         }
+         return permissions;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
index 0000000,39b17c7..ce5bea5
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java
@@@ -1,0 -1,374 +1,374 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.ATTRIBUTE_PREFIX;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.ATTRIBUTE_TYPE_SUFFIX;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_BOOLEAN;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_BYTE;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_DOUBLE;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_FLOAT;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_INTEGER;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_LONG;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_OBJECT;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_SHORT;
+ import static 
org.apache.nifi.processors.standard.util.JmsFactory.PROP_TYPE_STRING;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.ATTRIBUTES_TO_JMS_PROPS;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.MAX_BUFFER_SIZE;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_PRIORITY;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_TTL;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_TYPE;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_BYTE;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_EMPTY;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_STREAM;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.MSG_TYPE_TEXT;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QUEUE;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
+ import static 
org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
+ import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Map.Entry;
+ import java.util.Queue;
+ import java.util.Set;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ 
+ import javax.jms.BytesMessage;
+ import javax.jms.Destination;
+ import javax.jms.JMSException;
+ import javax.jms.Message;
+ import javax.jms.MessageProducer;
+ import javax.jms.Session;
+ import javax.jms.StreamMessage;
+ 
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processors.standard.util.JmsFactory;
+ import org.apache.nifi.processors.standard.util.WrappedMessageProducer;
+ 
+ @Tags({"jms", "send", "put"})
+ @CapabilityDescription("Creates a JMS Message from the contents of a FlowFile 
and sends the message to a JMS Server")
+ public class PutJMS extends AbstractProcessor {
+ 
+     public static final Charset UTF8 = Charset.forName("UTF-8");
+     public static final int DEFAULT_MESSAGE_PRIORITY = 4;
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+             .description("All FlowFiles that are sent to the JMS destination 
are routed to this relationship").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+             .description("All FlowFiles that cannot be routed to the JMS 
destination are routed to this relationship").build();
+ 
+     private final Queue<WrappedMessageProducer> producerQueue = new 
LinkedBlockingQueue<>();
+     private final List<PropertyDescriptor> properties;
+     private final Set<Relationship> relationships;
+ 
+     public PutJMS() {
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(JMS_PROVIDER);
+         descriptors.add(URL);
+         descriptors.add(DESTINATION_NAME);
+         descriptors.add(DESTINATION_TYPE);
+         descriptors.add(TIMEOUT);
+         descriptors.add(BATCH_SIZE);
+         descriptors.add(USERNAME);
+         descriptors.add(PASSWORD);
+         descriptors.add(MESSAGE_TYPE);
+         descriptors.add(MESSAGE_PRIORITY);
+         descriptors.add(REPLY_TO_QUEUE);
+         descriptors.add(MAX_BUFFER_SIZE);
+         descriptors.add(MESSAGE_TTL);
+         descriptors.add(ATTRIBUTES_TO_JMS_PROPS);
+         descriptors.add(CLIENT_ID_PREFIX);
+         this.properties = Collections.unmodifiableList(descriptors);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @OnStopped
+     public void cleanupResources() {
+         WrappedMessageProducer wrappedProducer = producerQueue.poll();
+         while (wrappedProducer != null) {
+             wrappedProducer.close(getLogger());
+             wrappedProducer = producerQueue.poll();
+         }
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+         final ProcessorLog logger = getLogger();
+         final List<FlowFile> flowFiles = 
session.get(context.getProperty(BATCH_SIZE).asInteger().intValue());
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         WrappedMessageProducer wrappedProducer = producerQueue.poll();
+         if (wrappedProducer == null) {
+             try {
+                 wrappedProducer = JmsFactory.createMessageProducer(context, 
true);
+                 logger.info("Connected to JMS server {}", new 
Object[]{context.getProperty(URL).getValue()});
+             } catch (final JMSException e) {
+                 logger.error("Failed to connect to JMS Server due to {}", new 
Object[]{e});
+                 session.transfer(flowFiles, REL_FAILURE);
+                 context.yield();
+                 return;
+             }
+         }
+ 
+         final Session jmsSession = wrappedProducer.getSession();
+         final MessageProducer producer = wrappedProducer.getProducer();
+ 
+         final int maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ 
+         try {
+             final Set<FlowFile> successfulFlowFiles = new HashSet<>();
+ 
+             for (FlowFile flowFile : flowFiles) {
+                 if (flowFile.getSize() > maxBufferSize) {
+                     session.transfer(flowFile, REL_FAILURE);
+                     logger.warn("Routing {} to failure because its size 
exceeds the configured max", new Object[]{flowFile});
+                     continue;
+                 }
+ 
+                 // Read the contents of the FlowFile into a byte array
+                 final byte[] messageContent = new byte[(int) 
flowFile.getSize()];
+                 session.read(flowFile, new InputStreamCallback() {
+                     @Override
+                     public void process(final InputStream in) throws 
IOException {
+                         StreamUtils.fillBuffer(in, messageContent, true);
+                     }
+                 });
+ 
+                 final Long ttl = 
context.getProperty(MESSAGE_TTL).asTimePeriod(TimeUnit.MILLISECONDS);
+ 
+                 final String replyToQueueName = 
context.getProperty(REPLY_TO_QUEUE).evaluateAttributeExpressions(flowFile).getValue();
+                 final Destination replyToQueue = replyToQueueName == null ? 
null : JmsFactory.createQueue(context, replyToQueueName);
+ 
+                 int priority = DEFAULT_MESSAGE_PRIORITY;
+                 try {
+                     final Integer priorityInt = 
context.getProperty(MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).asInteger();
+                     priority = priorityInt == null ? priority : priorityInt;
+                 } catch (final NumberFormatException e) {
+                     logger.warn("Invalid value for JMS Message Priority: {}; 
defaulting to priority of {}", new Object[]{
+                         
context.getProperty(MESSAGE_PRIORITY).evaluateAttributeExpressions(flowFile).getValue(),
 DEFAULT_MESSAGE_PRIORITY});
+                 }
+ 
+                 try {
+                     final Message message = createMessage(jmsSession, 
context, messageContent, flowFile, replyToQueue, priority);
+                     if (ttl == null) {
+                         producer.setTimeToLive(0L);
+                     } else {
+                         producer.setTimeToLive(ttl);
+                     }
+                     producer.send(message);
+                 } catch (final JMSException e) {
+                     logger.error("Failed to send {} to JMS Server due to {}", 
new Object[]{flowFile, e});
+                     session.transfer(flowFiles, REL_FAILURE);
+                     context.yield();
+ 
+                     try {
+                         jmsSession.rollback();
+                     } catch (final JMSException jmse) {
+                         logger.warn("Unable to roll back JMS Session due to 
{}", new Object[]{jmse});
+                     }
+ 
+                     wrappedProducer.close(logger);
+                     return;
+                 }
+ 
+                 successfulFlowFiles.add(flowFile);
+                 session.getProvenanceReporter().send(flowFile, "jms://" + 
context.getProperty(URL).getValue());
+             }
+ 
+             try {
+                 jmsSession.commit();
+ 
+                 session.transfer(successfulFlowFiles, REL_SUCCESS);
+                 final String flowFileDescription = successfulFlowFiles.size() 
> 10 ? successfulFlowFiles.size() + " FlowFiles" : 
successfulFlowFiles.toString();
+                 logger.info("Sent {} to JMS Server and transferred to 
'success'", new Object[]{flowFileDescription});
+             } catch (JMSException e) {
+                 logger.error("Failed to commit JMS Session due to {}; rolling 
back session", new Object[]{e});
+                 session.rollback();
+                 wrappedProducer.close(logger);
+             }
+         } finally {
+             if (!wrappedProducer.isClosed()) {
+                 producerQueue.offer(wrappedProducer);
+             }
+         }
+     }
+ 
+     private Message createMessage(final Session jmsSession, final 
ProcessContext context, final byte[] messageContent,
+             final FlowFile flowFile, final Destination replyToQueue, final 
Integer priority) throws JMSException {
+         final Message message;
+ 
+         switch (context.getProperty(MESSAGE_TYPE).getValue()) {
+             case MSG_TYPE_EMPTY: {
+                 message = jmsSession.createTextMessage("");
+             }
+             break;
+             case MSG_TYPE_STREAM: {
+                 final StreamMessage streamMessage = 
jmsSession.createStreamMessage();
+                 streamMessage.writeBytes(messageContent);
+                 message = streamMessage;
+             }
+             break;
+             case MSG_TYPE_TEXT: {
+                 message = jmsSession.createTextMessage(new 
String(messageContent, UTF8));
+             }
+             break;
+             case MSG_TYPE_BYTE:
+             default: {
+                 final BytesMessage bytesMessage = 
jmsSession.createBytesMessage();
+                 bytesMessage.writeBytes(messageContent);
+                 message = bytesMessage;
+             }
+         }
+ 
+         message.setJMSTimestamp(System.currentTimeMillis());
+ 
+         if (replyToQueue != null) {
+             message.setJMSReplyTo(replyToQueue);
+         }
+ 
+         if (priority != null) {
+             message.setJMSPriority(priority);
+         }
+ 
+         if (context.getProperty(ATTRIBUTES_TO_JMS_PROPS).asBoolean()) {
+             copyAttributesToJmsProps(flowFile, message);
+         }
+ 
+         return message;
+     }
+ 
+     /**
+      * Iterates through all of the flow file's metadata and for any metadata 
key
+      * that starts with <code>jms.</code>, the value for the corresponding key
+      * is written to the JMS message as a property. The name of this property 
is
+      * equal to the key of the flow file's metadata minus the 
<code>jms.</code>.
+      * For example, if the flowFile has a metadata entry:
+      * <br /><br />
+      * <code>jms.count</code> = <code>8</code>
+      * <br /><br />
+      * then the JMS message will have a String property added to it with the
+      * property name <code>count</code> and value <code>8</code>.
+      *
+      * If the flow file also has a metadata key with the name
+      * <code>jms.count.type</code>, then the value of that metadata entry will
+      * determine the JMS property type to use for the value. For example, if 
the
+      * flow file has the following properties:
+      * <br /><br />
+      * <code>jms.count</code> = <code>8</code><br />
+      * <code>jms.count.type</code> = <code>integer</code>
+      * <br /><br />
+      * Then <code>message</code> will have an INTEGER property added with the
+      * value 8.
+      * <br /><br/>
+      * If the type is not valid for the given value (e.g.,
+      * <code>jms.count.type</code> = <code>integer</code> and
+      * <code>jms.count</code> = <code>hello</code>, then this JMS property 
will
+      * not be added to <code>message</code>.
+      *
+      * @param flowFile The flow file whose metadata should be examined for JMS
+      * properties.
+      * @param message The JMS message to which we want to add properties.
+      * @throws JMSException
+      */
+     private void copyAttributesToJmsProps(final FlowFile flowFile, final 
Message message) throws JMSException {
+         final ProcessorLog logger = getLogger();
+ 
+         final Map<String, String> attributes = flowFile.getAttributes();
+         for (final Entry<String, String> entry : attributes.entrySet()) {
+             final String key = entry.getKey();
+             final String value = entry.getValue();
+ 
+             if (key.toLowerCase().startsWith(ATTRIBUTE_PREFIX.toLowerCase())
+                     && 
!key.toLowerCase().endsWith(ATTRIBUTE_TYPE_SUFFIX.toLowerCase())) {
+ 
+                 final String jmsPropName = 
key.substring(ATTRIBUTE_PREFIX.length());
+                 final String type = attributes.get(key + 
ATTRIBUTE_TYPE_SUFFIX);
+ 
+                 try {
+                     if (type == null || 
type.equalsIgnoreCase(PROP_TYPE_STRING)) {
+                         message.setStringProperty(jmsPropName, value);
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_INTEGER)) {
+                         message.setIntProperty(jmsPropName, 
Integer.parseInt(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_BOOLEAN)) {
+                         message.setBooleanProperty(jmsPropName, 
Boolean.parseBoolean(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_SHORT)) {
+                         message.setShortProperty(jmsPropName, 
Short.parseShort(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_LONG)) {
+                         message.setLongProperty(jmsPropName, 
Long.parseLong(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_BYTE)) {
+                         message.setByteProperty(jmsPropName, 
Byte.parseByte(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_DOUBLE)) {
+                         message.setDoubleProperty(jmsPropName, 
Double.parseDouble(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_FLOAT)) {
+                         message.setFloatProperty(jmsPropName, 
Float.parseFloat(value));
+                     } else if (type.equalsIgnoreCase(PROP_TYPE_OBJECT)) {
+                         message.setObjectProperty(jmsPropName, value);
+                     } else {
+                         logger.warn("Attribute key '{}' for {} has value 
'{}', but expected one of: integer, string, object, byte, double, float, long, 
short, boolean; not adding this property",
+                                 new Object[]{key, flowFile, value});
+                     }
+                 } catch (NumberFormatException e) {
+                     logger.warn("Attribute key '{}' for {} has value '{}', 
but attribute key '{}' has value '{}'. Not adding this JMS property",
+                             new Object[]{key, flowFile, value, key + 
ATTRIBUTE_TYPE_SUFFIX, PROP_TYPE_INTEGER});
+                 }
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
index 0000000,a8d9c18..cfd522c
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSFTP.java
@@@ -1,0 -1,85 +1,85 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.List;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processors.standard.util.SFTPTransfer;
+ 
+ @SupportsBatching
+ @Tags({"remote", "copy", "egress", "put", "sftp", "archive", "files"})
+ @CapabilityDescription("Sends FlowFiles to an SFTP Server")
+ public class PutSFTP extends PutFileTransfer<SFTPTransfer> {
+ 
+     private List<PropertyDescriptor> properties;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(SFTPTransfer.HOSTNAME);
+         properties.add(SFTPTransfer.PORT);
+         properties.add(SFTPTransfer.USERNAME);
+         properties.add(SFTPTransfer.PASSWORD);
+         properties.add(SFTPTransfer.PRIVATE_KEY_PATH);
+         properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE);
+         properties.add(SFTPTransfer.REMOTE_PATH);
+         properties.add(SFTPTransfer.CREATE_DIRECTORY);
+         properties.add(SFTPTransfer.BATCH_SIZE);
+         properties.add(SFTPTransfer.CONNECTION_TIMEOUT);
+         properties.add(SFTPTransfer.DATA_TIMEOUT);
+         properties.add(SFTPTransfer.CONFLICT_RESOLUTION);
+         properties.add(SFTPTransfer.REJECT_ZERO_BYTE);
+         properties.add(SFTPTransfer.DOT_RENAME);
+         properties.add(SFTPTransfer.TEMP_FILENAME);
+         properties.add(SFTPTransfer.HOST_KEY_FILE);
+         properties.add(SFTPTransfer.LAST_MODIFIED_TIME);
+         properties.add(SFTPTransfer.PERMISSIONS);
+         properties.add(SFTPTransfer.REMOTE_OWNER);
+         properties.add(SFTPTransfer.REMOTE_GROUP);
+         properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING);
+         properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT);
+         properties.add(SFTPTransfer.USE_COMPRESSION);
+         this.properties = Collections.unmodifiableList(properties);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String 
propertyDescriptorName) {
+         if 
(SFTPTransfer.DISABLE_DIRECTORY_LISTING.getName().equalsIgnoreCase(propertyDescriptorName))
 {
+             return SFTPTransfer.DISABLE_DIRECTORY_LISTING;
+         }
+         return 
super.getSupportedDynamicPropertyDescriptor(propertyDescriptorName);
+     }
+ 
+     @Override
+     protected SFTPTransfer getFileTransfer(final ProcessContext context) {
+         return new SFTPTransfer(context, getLogger());
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 0000000,ae5350b..111dead
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@@ -1,0 -1,289 +1,289 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import java.io.BufferedWriter;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.Validator;
+ import org.apache.nifi.expression.AttributeValueDecorator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processor.util.FlowFileFilters;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.util.NLKBufferedReader;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
+ import java.nio.charset.Charset;
+ import java.util.*;
+ import java.util.concurrent.TimeUnit;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", 
"Regex"})
+ @CapabilityDescription("Updates the content of a FlowFile by evaluating a 
Regular Expression against it and replacing the section of the content that 
matches the Regular Expression with some alternate value.")
+ public class ReplaceText extends AbstractProcessor {
+ 
+     //Constants
+     public static final String LINE_BY_LINE = "Line-by-Line";
+     public static final String ENTIRE_TEXT = "Entire text";
+     private final Pattern backReferencePattern = Pattern.compile("\\$(\\d+)");
+     private static final byte[] ZERO_BYTE_BUFFER = new byte[0];
+     // Properties
+     public static final PropertyDescriptor REGEX = new 
PropertyDescriptor.Builder()
+             .name("Regular Expression")
+             .description("The Regular Expression to search for in the 
FlowFile content")
+             .required(true)
+             .addValidator(StandardValidators.createRegexValidator(0, 
Integer.MAX_VALUE, true))
+             .expressionLanguageSupported(true)
+             .defaultValue("(.*)")
+             .build();
+     public static final PropertyDescriptor REPLACEMENT_VALUE = new 
PropertyDescriptor.Builder()
+             .name("Replacement Value")
+             .description("The value to replace the regular expression with. 
Back-references to Regular Expression capturing groups are supported, but 
back-references that reference capturing groups that do not exist in the 
regular expression will be treated as literal value.")
+             .required(true)
+             .defaultValue("$1")
+             .addValidator(Validator.VALID)
+             .expressionLanguageSupported(true)
+             .build();
+     public static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+             .name("Character Set")
+             .description("The Character Set in which the file is encoded")
+             .required(true)
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .defaultValue("UTF-8")
+             .build();
+     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Maximum Buffer Size")
+             .description("Specifies the maximum amount of data to buffer (per 
file or per line, depending on the Evaluation Mode) in order to apply the 
regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the 
FlowFile is larger than this value, the FlowFile will be routed to 'failure'. "
+                     + "In 'Line-by-Line' Mode, if a single line is larger 
than this value, the FlowFile will be routed to 'failure'. A default value of 1 
MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a 
value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer 
is not used if 'Regular Expression' is set to '.*'")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .build();
+     public static final PropertyDescriptor EVALUATION_MODE = new 
PropertyDescriptor.Builder()
+             .name("Evaluation Mode")
+             .description("Evaluate the 'Regular Expression' against each line 
(Line-by-Line) or buffer the entire file into memory (Entire Text) and then 
evaluate the 'Regular Expression'.")
+             .allowableValues(LINE_BY_LINE, ENTIRE_TEXT)
+             .defaultValue(ENTIRE_TEXT)
+             .required(true)
+             .build();
+     // Relationships
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("FlowFiles that have been 
successfully updated are routed to this relationship, as well as FlowFiles 
whose content does not match the given Regular Expression").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("FlowFiles that could not be 
updated are routed to this relationship").build();
+     //
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(REGEX);
+         properties.add(REPLACEMENT_VALUE);
+         properties.add(CHARACTER_SET);
+         properties.add(MAX_BUFFER_SIZE);
+         properties.add(EVALUATION_MODE);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+         final List<FlowFile> flowFiles = 
session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 100));
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+         final String unsubstitutedRegex = 
context.getProperty(REGEX).getValue();
+         String unsubstitutedReplacement = 
context.getProperty(REPLACEMENT_VALUE).getValue();
+         if (unsubstitutedRegex.equals("(.*)") && 
unsubstitutedReplacement.equals("$1")) {
+             // This pattern says replace content with itself. We can highly 
optimize this process by simply transferring
+             // all FlowFiles to the 'success' relationship
+             session.transfer(flowFiles, REL_SUCCESS);
+             return;
+         }
+ 
+         final AttributeValueDecorator quotedAttributeDecorator = new 
AttributeValueDecorator() {
+             @Override
+             public String decorate(final String attributeValue) {
+                 return Pattern.quote(attributeValue);
+             }
+         };
+ 
+         final AttributeValueDecorator escapeBackRefDecorator = new 
AttributeValueDecorator() {
+             @Override
+             public String decorate(final String attributeValue) {
+                 return attributeValue.replace("$", "\\$");
+             }
+         };
+ 
+         final String regexValue = 
context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
+         final int numCapturingGroups = 
Pattern.compile(regexValue).matcher("").groupCount();
+ 
+         final boolean skipBuffer = ".*".equals(unsubstitutedRegex);
+ 
+         final Charset charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+         final int maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ 
+         final byte[] buffer = skipBuffer ? ZERO_BYTE_BUFFER : new 
byte[maxBufferSize];
+ 
+         final String evaluateMode = 
context.getProperty(EVALUATION_MODE).getValue();
+ 
+         for (FlowFile flowFile : flowFiles) {
+             if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+                 if (flowFile.getSize() > maxBufferSize && !skipBuffer) {
+                     session.transfer(flowFile, REL_FAILURE);
+                     continue;
+                 }
+             }
+ 
+             String replacement = 
context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile, 
escapeBackRefDecorator).getValue();
+             final Matcher backRefMatcher = 
backReferencePattern.matcher(replacement);
+             while (backRefMatcher.find()) {
+                 final String backRefNum = backRefMatcher.group(1);
+                 if (backRefNum.startsWith("0")) {
+                     continue;
+                 }
+                 final int originalBackRefIndex = Integer.parseInt(backRefNum);
+                 int backRefIndex = originalBackRefIndex;
+ 
+                 // if we have a replacement value like $123, and we have less 
than 123 capturing groups, then 
+                 // we want to truncate the 3 and use capturing group 12; if 
we have less than 12 capturing groups,
+                 // then we want to truncate the 2 and use capturing group 1; 
if we don't have a capturing group then
+                 // we want to truncate the 1 and get 0.
+                 while (backRefIndex > numCapturingGroups && backRefIndex >= 
10) {
+                     backRefIndex /= 10;
+                 }
+ 
+                 if (backRefIndex > numCapturingGroups) {
+                     final StringBuilder sb = new 
StringBuilder(replacement.length() + 1);
+                     final int groupStart = backRefMatcher.start(1);
+ 
+                     sb.append(replacement.substring(0, groupStart - 1));
+                     sb.append("\\");
+                     sb.append(replacement.substring(groupStart - 1));
+                     replacement = sb.toString();
+                 }
+             }
+ 
+             replacement = replacement.replaceAll("(\\$\\D)", "\\\\$1");
+ 
+             // always match; just overwrite value with the replacement value; 
this optimization prevents us
+             // from reading the file at all.
+             final String replacementValue = replacement;
+             if (skipBuffer) {
+                 final StopWatch stopWatch = new StopWatch(true);
+                 if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+                     flowFile = session.write(flowFile, new 
OutputStreamCallback() {
+                         @Override
+                         public void process(final OutputStream out) throws 
IOException {
+                             out.write(replacementValue.getBytes(charset));
+                         }
+                     });
+                 } else {
+                     flowFile = session.write(flowFile, new StreamCallback() {
+                         @Override
+                         public void process(final InputStream in, final 
OutputStream out) throws IOException {
+                             try (NLKBufferedReader br = new 
NLKBufferedReader(new InputStreamReader(in, charset), maxBufferSize);
+                                     BufferedWriter bw = new 
BufferedWriter(new OutputStreamWriter(out, charset));) {
+                                 while (null != br.readLine()) {
+                                     bw.write(replacementValue);
+                                 }
+                             }
+                         }
+                     });
+                 }
+                 session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                 session.transfer(flowFile, REL_SUCCESS);
+                 logger.info("Transferred {} to 'success'", new 
Object[]{flowFile});
+                 continue;
+             }
+ 
+             final StopWatch stopWatch = new StopWatch(true);
+             final String regex = 
context.getProperty(REGEX).evaluateAttributeExpressions(flowFile, 
quotedAttributeDecorator).getValue();
+ 
+             if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
+                 final int flowFileSize = (int) flowFile.getSize();
+                 flowFile = session.write(flowFile, new StreamCallback() {
+                     @Override
+                     public void process(final InputStream in, final 
OutputStream out) throws IOException {
+                         StreamUtils.fillBuffer(in, buffer, false);
+                         final String contentString = new String(buffer, 0, 
flowFileSize, charset);
+                         final String updatedValue = 
contentString.replaceAll(regex, replacementValue);
+                         out.write(updatedValue.getBytes(charset));
+                     }
+                 });
+             } else {
+                 flowFile = session.write(flowFile, new StreamCallback() {
+                     @Override
+                     public void process(final InputStream in, final 
OutputStream out) throws IOException {
+                         try (NLKBufferedReader br = new NLKBufferedReader(new 
InputStreamReader(in, charset), maxBufferSize);
+                                 BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset));) {
+                             String oneLine;
+                             while (null != (oneLine = br.readLine())) {
+                                 final String updatedValue = 
oneLine.replaceAll(regex, replacementValue);
+                                 bw.write(updatedValue);
+                             }
+                         }
+                     }
+                 });
+             }
+ 
+             logger.info("Transferred {} to 'success'", new 
Object[]{flowFile});
+             session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             session.transfer(flowFile, REL_SUCCESS);
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
index 0000000,c99935b..c4dd83a
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceTextWithMapping.java
@@@ -1,0 -1,383 +1,383 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.BufferedReader;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.InputStreamReader;
+ import java.io.OutputStream;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Matcher;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.expression.AttributeValueDecorator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.SupportsBatching;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.behavior.SupportsBatching;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import org.apache.commons.lang3.StringUtils;
+ 
+ @EventDriven
+ @SideEffectFree
+ @SupportsBatching
+ @Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", 
"Regex", "Mapping"})
+ @CapabilityDescription("Updates the content of a FlowFile by evaluating a 
Regular Expression against it and replacing the section of the content that 
matches the Regular Expression with some alternate value provided in a mapping 
file.")
+ public class ReplaceTextWithMapping extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor REGEX = new 
PropertyDescriptor.Builder()
+             .name("Regular Expression")
+             .description("The Regular Expression to search for in the 
FlowFile content")
+             .required(true)
+             .addValidator(StandardValidators.createRegexValidator(0, 
Integer.MAX_VALUE, true))
+             .expressionLanguageSupported(true)
+             .defaultValue("\\S+")
+             .build();
+     public static final PropertyDescriptor MATCHING_GROUP_FOR_LOOKUP_KEY = 
new PropertyDescriptor.Builder()
+             .name("Matching Group")
+             .description("The number of the matching group of the provided 
regex to replace with the corresponding value from the mapping file (if it 
exists).")
+             .addValidator(StandardValidators.INTEGER_VALIDATOR)
+             .required(true)
+             .expressionLanguageSupported(true)
+             .defaultValue("0").build();
+     public static final PropertyDescriptor MAPPING_FILE = new 
PropertyDescriptor.Builder()
+             .name("Mapping File")
+             .description("The name of the file (including the full path) 
containing the Mappings.")
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .required(true)
+             .build();
+     public static final PropertyDescriptor MAPPING_FILE_REFRESH_INTERVAL = 
new PropertyDescriptor.Builder()
+             .name("Mapping File Refresh Interval")
+             .description("The polling interval in seconds to check for 
updates to the mapping file. The default is 60s.")
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .required(true)
+             .defaultValue("60s").build();
+     public static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+             .name("Character Set")
+             .description("The Character Set in which the file is encoded")
+             .required(true)
+             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+             .defaultValue("UTF-8")
+             .build();
+     public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Maximum Buffer Size")
+             .description("Specifies the maximum amount of data to buffer (per 
file) in order to apply the regular expressions. If a FlowFile is larger than 
this value, the FlowFile will be routed to 'failure'")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("1 MB")
+             .build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("FlowFiles that have been 
successfully updated are routed to this relationship, as well as FlowFiles 
whose content does not match the given Regular Expression").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("FlowFiles that could not be 
updated are routed to this relationship").build();
+ 
+     private final Pattern backReferencePattern = 
Pattern.compile("[^\\\\]\\$(\\d+)");
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     private final ReentrantLock processorLock = new ReentrantLock();
+     private final AtomicLong lastModified = new AtomicLong(0L);
+     final AtomicLong mappingTestTime = new AtomicLong(0);
+     private final AtomicReference<ConfigurationState> configurationStateRef = 
new AtomicReference<>(
+             new ConfigurationState(null));
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
+         final List<ValidationResult> errors = new 
ArrayList<>(super.customValidate(context));
+ 
+         final String regexValue = 
context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
+         final int numCapturingGroups = 
Pattern.compile(regexValue).matcher("").groupCount();
+         final int groupToMatch = 
context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
+ 
+         if (groupToMatch > numCapturingGroups) {
+             errors.add(new ValidationResult.Builder().subject("Insufficient 
Matching Groups").valid(false).explanation("The specified matching group does 
not exist for the regular expression provided").build());
+         }
+         return errors;
+     }
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(REGEX);
+         properties.add(MATCHING_GROUP_FOR_LOOKUP_KEY);
+         properties.add(MAPPING_FILE);
+         properties.add(MAPPING_FILE_REFRESH_INTERVAL);
+         properties.add(CHARACTER_SET);
+         properties.add(MAX_BUFFER_SIZE);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+         updateMapping(context);
+         final List<FlowFile> flowFiles = session.get(5);
+         if (flowFiles.isEmpty()) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+ 
+         final int maxBufferSize = 
context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ 
+         for (FlowFile flowFile : flowFiles) {
+             if (flowFile.getSize() > maxBufferSize) {
+                 session.transfer(flowFile, REL_FAILURE);
+                 continue;
+             }
+ 
+             final StopWatch stopWatch = new StopWatch(true);
+ 
+             flowFile = session.write(flowFile, new 
ReplaceTextCallback(context, flowFile, maxBufferSize));
+ 
+             logger.info("Transferred {} to 'success'", new 
Object[]{flowFile});
+             session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+             session.transfer(flowFile, REL_SUCCESS);
+         }
+     }
+ 
+     protected String fillReplacementValueBackReferences(String 
rawReplacementValue, int numCapturingGroups) {
+         String replacement = rawReplacementValue;
+         final Matcher backRefMatcher = 
backReferencePattern.matcher(replacement);
+         int replacementCount = 0;
+         while (backRefMatcher.find()) {
+             final int backRefIndex = 
Integer.parseInt(backRefMatcher.group(1));
+             if (backRefIndex > numCapturingGroups || backRefIndex < 0) {
+                 final StringBuilder sb = new 
StringBuilder(replacement.length() + 1);
+                 final int groupStart = backRefMatcher.start(1) + 
replacementCount++;
+ 
+                 sb.append(replacement.substring(0, groupStart - 1));
+                 sb.append("\\");
+                 sb.append(replacement.substring(groupStart - 1));
+                 replacement = sb.toString();
+             }
+         }
+ 
+         replacement = replacement.replaceAll("(\\$\\D)", "\\\\$1");
+ 
+         return replacement;
+     }
+ 
+     private void updateMapping(final ProcessContext context) {
+         if (processorLock.tryLock()) {
+             final ProcessorLog logger = getLogger();
+             try {
+                 // if not queried mapping file lastUpdate time in
+                 // mapppingRefreshPeriodSecs, do so.
+                 long currentTimeSecs = System.currentTimeMillis() / 1000;
+                 long mappingRefreshPeriodSecs = 
context.getProperty(MAPPING_FILE_REFRESH_INTERVAL).asTimePeriod(TimeUnit.SECONDS);
+ 
+                 boolean retry = (currentTimeSecs > (mappingTestTime.get() + 
mappingRefreshPeriodSecs));
+                 if (retry) {
+                     mappingTestTime.set(System.currentTimeMillis() / 1000);
+                     // see if the mapping file needs to be reloaded
+                     final String fileName = 
context.getProperty(MAPPING_FILE).getValue();
+                     final File file = new File(fileName);
+                     if (file.exists() && file.isFile() && file.canRead()) {
+                         if (file.lastModified() > lastModified.get()) {
+                             lastModified.getAndSet(file.lastModified());
+                             try (FileInputStream is = new 
FileInputStream(file)) {
+                                 logger.info("Reloading mapping file: {}", new 
Object[]{fileName});
+ 
+                                 final Map<String, String> mapping = 
loadMappingFile(is);
+                                 final ConfigurationState newState = new 
ConfigurationState(
+                                         mapping);
+                                 configurationStateRef.set(newState);
+                             } catch (IOException e) {
+                                 logger.error("Error reading mapping file: 
{}", new Object[]{e.getMessage()});
+                             }
+                         }
+                     } else {
+                         logger.error("Mapping file does not exist or is not 
readable: {}", new Object[]{fileName});
+                     }
+                 }
+             } catch (Exception e) {
+                 logger.error("Error loading mapping file: {}", new 
Object[]{e.getMessage()});
+             } finally {
+                 processorLock.unlock();
+             }
+         }
+     }
+ 
+     /**
+      * Loads a file containing mappings.
+      *
+      * @param is
+      * @return 
+      * @throws IOException
+      */
+     protected Map<String, String> loadMappingFile(InputStream is) throws 
IOException {
+         Map<String, String> mapping = new HashMap<>();
+         BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+         String line = null;
+         while ((line = reader.readLine()) != null) {
+             final String[] splits = StringUtils.split(line, "\t ", 2);
+             if (splits.length == 1) {
+                 mapping.put(splits[0].trim(), ""); // support key with empty 
value
+             } else if (splits.length == 2) {
+                 final String key = splits[0].trim();
+                 final String value = splits[1].trim();
+                 mapping.put(key, value);
+             }
+         }
+         return mapping;
+     }
+ 
+     public static class ConfigurationState {
+ 
+         final Map<String, String> mapping = new HashMap<>();
+ 
+         public ConfigurationState(final Map<String, String> mapping) {
+             if (mapping != null) {
+                 this.mapping.putAll(mapping);
+             }
+         }
+ 
+         public Map<String, String> getMapping() {
+             return Collections.unmodifiableMap(mapping);
+         }
+ 
+         public boolean isConfigured() {
+             return !mapping.isEmpty();
+         }
+     }
+ 
+     private final class ReplaceTextCallback implements StreamCallback {
+ 
+         private final Charset charset;
+         private final byte[] buffer;
+         private final String regex;
+         private final FlowFile flowFile;
+         private final int numCapturingGroups;
+         private final int groupToMatch;
+ 
+         private final AttributeValueDecorator quotedAttributeDecorator = new 
AttributeValueDecorator() {
+             @Override
+             public String decorate(final String attributeValue) {
+                 return Pattern.quote(attributeValue);
+             }
+         };
+ 
+         private ReplaceTextCallback(ProcessContext context, FlowFile 
flowFile, int maxBufferSize) {
+             this.regex = 
context.getProperty(REGEX).evaluateAttributeExpressions(flowFile, 
quotedAttributeDecorator).getValue();
+             this.flowFile = flowFile;
+ 
+             this.charset = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+ 
+             final String regexValue = 
context.getProperty(REGEX).evaluateAttributeExpressions().getValue();
+             this.numCapturingGroups = 
Pattern.compile(regexValue).matcher("").groupCount();
+ 
+             this.buffer = new byte[maxBufferSize];
+ 
+             this.groupToMatch = 
context.getProperty(MATCHING_GROUP_FOR_LOOKUP_KEY).evaluateAttributeExpressions().asInteger();
+         }
+ 
+         @Override
+         public void process(final InputStream in, final OutputStream out) 
throws IOException {
+ 
+             final Map<String, String> mapping = configurationStateRef.get()
+                     .getMapping();
+ 
+             StreamUtils.fillBuffer(in, buffer, false);
+ 
+             final int flowFileSize = (int) flowFile.getSize();
+ 
+             final String contentString = new String(buffer, 0, flowFileSize, 
charset);
+ 
+             final Matcher matcher = 
Pattern.compile(regex).matcher(contentString);
+ 
+             matcher.reset();
+             boolean result = matcher.find();
+             if (result) {
+                 StringBuffer sb = new StringBuffer();
+                 do {
+                     String matched = matcher.group(groupToMatch);
+                     String rv = mapping.get(matched);
+ 
+                     if (rv == null) {
+                         String replacement = matcher.group().replace("$", 
"\\$");
+                         matcher.appendReplacement(sb, replacement);
+                     } else {
+                         String allRegexMatched = matcher.group(); //this is 
everything that matched the regex
+ 
+                         int scaledStart = matcher.start(groupToMatch) - 
matcher.start();
+                         int scaledEnd = scaledStart + 
matcher.group(groupToMatch).length();
+ 
+                         StringBuilder replacementBuilder = new 
StringBuilder();
+ 
+                         
replacementBuilder.append(allRegexMatched.substring(0, 
scaledStart).replace("$", "\\$"));
+                         
replacementBuilder.append(fillReplacementValueBackReferences(rv, 
numCapturingGroups));
+                         
replacementBuilder.append(allRegexMatched.substring(scaledEnd).replace("$", 
"\\$"));
+ 
+                         matcher.appendReplacement(sb, 
replacementBuilder.toString());
+                     }
+                     result = matcher.find();
+                 } while (result);
+                 matcher.appendTail(sb);
+                 out.write(sb.toString().getBytes(charset));
+                 return;
+             }
+             out.write(contentString.getBytes(charset));
+         }
+     }
+ }

Reply via email to