NIFI-810: Addressed several checkstyle violations

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ccfb57fe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ccfb57fe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ccfb57fe

Branch: refs/heads/NIFI-810-InputRequirement
Commit: ccfb57fe9ff43f11319dcb1625bfc78b1d88f56a
Parents: b974445
Author: Mark Payne <[email protected]>
Authored: Wed Oct 7 17:48:51 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Wed Oct 7 17:48:51 2015 -0400

----------------------------------------------------------------------
 .../annotation/behavior/InputRequirement.java   |  70 +-
 .../nifi/processors/aws/s3/PutS3Object.java     |  46 +-
 .../apache/nifi/controller/ProcessorNode.java   |  88 +--
 .../nifi/controller/StandardProcessorNode.java  |  10 +-
 .../standard/Base64EncodeContent.java           | 168 ++---
 .../nifi/processors/standard/ControlRate.java   | 672 +++++++++----------
 6 files changed, 534 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
 
b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
index 97e6b88..13f442c 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/InputRequirement.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.annotation.behavior;
 
 import java.lang.annotation.Documented;
@@ -21,31 +37,31 @@ import java.lang.annotation.Target;
 @Retention(RetentionPolicy.RUNTIME)
 @Inherited
 public @interface InputRequirement {
-       Requirement value();
-       
-       public static enum Requirement {
-               /**
-                * This value is used to indicate that the Processor requires 
input from other Processors
-                * in order to run. As a result, the Processor will not be 
valid if it does not have any
-                * incoming connections.
-                */
-               INPUT_REQUIRED,
-               
-               /**
-                * This value is used to indicate that the Processor will 
consume data from an incoming
-                * connection but does not require an incoming connection in 
order to perform its task.
-                * If the {@link InputRequirement} annotation is not present, 
this is the default value
-                * that is used.
-                */
-               INPUT_ALLOWED,
-               
-               /**
-                * This value is used to indicate that the Processor is a 
"Source Processor" and does
-                * not accept incoming connections. Because the Processor does 
not pull FlowFiles from
-                * an incoming connection, it can be very confusing for users 
who create incoming connections
-                * to the Processor. As a result, this value can be used in 
order to clarify that incoming
-                * connections will not be used. This prevents the user from 
even creating such a connection.
-                */
-               INPUT_FORBIDDEN;
-       }
+    Requirement value();
+
+    public static enum Requirement {
+        /**
+         * This value is used to indicate that the Processor requires input 
from other Processors
+         * in order to run. As a result, the Processor will not be valid if it 
does not have any
+         * incoming connections.
+         */
+        INPUT_REQUIRED,
+
+        /**
+         * This value is used to indicate that the Processor will consume data 
from an incoming
+         * connection but does not require an incoming connection in order to 
perform its task.
+         * If the {@link InputRequirement} annotation is not present, this is 
the default value
+         * that is used.
+         */
+        INPUT_ALLOWED,
+
+        /**
+         * This value is used to indicate that the Processor is a "Source 
Processor" and does
+         * not accept incoming connections. Because the Processor does not 
pull FlowFiles from
+         * an incoming connection, it can be very confusing for users who 
create incoming connections
+         * to the Processor. As a result, this value can be used in order to 
clarify that incoming
+         * connections will not be used. This prevents the user from even 
creating such a connection.
+         */
+        INPUT_FORBIDDEN;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 7398c4e..c7212f5 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -59,10 +59,8 @@ import com.amazonaws.services.s3.model.StorageClass;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
 @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
-@DynamicProperty(name = "The name of a User-Defined Metadata field to add to 
the S3 Object",
-        value = "The value of a User-Defined Metadata field to add to the S3 
Object",
-        description = "Allows user-defined metadata to be added to the S3 
object as key/value pairs",
-        supportsExpressionLanguage = true)
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to 
the S3 Object", value = "The value of a User-Defined Metadata field to add to 
the S3 Object",
+    description = "Allows user-defined metadata to be added to the S3 object 
as key/value pairs", supportsExpressionLanguage = true)
 @ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the S3 object")
 @WritesAttributes({
     @WritesAttribute(attribute = "s3.version", description = "The version of 
the S3 Object that was put to S3"),
@@ -72,22 +70,22 @@ import com.amazonaws.services.s3.model.StorageClass;
 public class PutS3Object extends AbstractS3Processor {
 
     public static final PropertyDescriptor EXPIRATION_RULE_ID = new 
PropertyDescriptor.Builder()
-            .name("Expiration Time Rule")
-            .required(false)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
+        .name("Expiration Time Rule")
+        .required(false)
+        .expressionLanguageSupported(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
 
     public static final PropertyDescriptor STORAGE_CLASS = new 
PropertyDescriptor.Builder()
-            .name("Storage Class")
-            .required(true)
-            .allowableValues(StorageClass.Standard.name(), 
StorageClass.ReducedRedundancy.name())
-            .defaultValue(StorageClass.Standard.name())
-            .build();
+        .name("Storage Class")
+        .required(true)
+        .allowableValues(StorageClass.Standard.name(), 
StorageClass.ReducedRedundancy.name())
+        .defaultValue(StorageClass.Standard.name())
+        .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, 
CREDENTAILS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
-                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, 
READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+        Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTAILS_FILE, 
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+            FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, 
READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -97,15 +95,15 @@ public class PutS3Object extends AbstractS3Processor {
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .expressionLanguageSupported(true)
-                .dynamic(true)
-                .build();
+            .name(propertyDescriptorName)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .dynamic(true)
+            .build();
     }
 
     @Override
-       public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
@@ -176,9 +174,9 @@ public class PutS3Object extends AbstractS3Processor {
             final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().send(flowFile, url, millis);
 
-            getLogger().info("Successfully put {} to Amazon S3 in {} 
milliseconds", new Object[]{ff, millis});
+            getLogger().info("Successfully put {} to Amazon S3 in {} 
milliseconds", new Object[] {ff, millis});
         } catch (final ProcessException | AmazonClientException pe) {
-            getLogger().error("Failed to put {} to Amazon S3 due to {}", new 
Object[]{flowFile, pe});
+            getLogger().error("Failed to put {} to Amazon S3 due to {}", new 
Object[] {flowFile, pe});
             session.transfer(flowFile, REL_FAILURE);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 2f72d0f..d340c77 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -31,72 +31,72 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
 
 public abstract class ProcessorNode extends AbstractConfiguredComponent 
implements Connectable {
 
-       public ProcessorNode(final Processor processor, final String id,
-               final ValidationContextFactory validationContextFactory, final 
ControllerServiceProvider serviceProvider) {
-               super(processor, id, validationContextFactory, serviceProvider);
-       }
+    public ProcessorNode(final Processor processor, final String id,
+        final ValidationContextFactory validationContextFactory, final 
ControllerServiceProvider serviceProvider) {
+        super(processor, id, validationContextFactory, serviceProvider);
+    }
 
-       public abstract boolean isIsolated();
+    public abstract boolean isIsolated();
 
-       public abstract boolean isTriggerWhenAnyDestinationAvailable();
+    public abstract boolean isTriggerWhenAnyDestinationAvailable();
 
-       @Override
-       public abstract boolean isSideEffectFree();
+    @Override
+    public abstract boolean isSideEffectFree();
 
-       public abstract boolean isTriggeredSerially();
+    public abstract boolean isTriggeredSerially();
 
-       public abstract boolean isEventDrivenSupported();
+    public abstract boolean isEventDrivenSupported();
 
-       public abstract boolean isHighThroughputSupported();
+    public abstract boolean isHighThroughputSupported();
 
-       public abstract Requirement getInputRequirement();
+    public abstract Requirement getInputRequirement();
 
-       @Override
-       public abstract boolean isValid();
+    @Override
+    public abstract boolean isValid();
 
-       public abstract void setScheduledState(ScheduledState scheduledState);
+    public abstract void setScheduledState(ScheduledState scheduledState);
 
-       public abstract void setBulletinLevel(LogLevel bulletinLevel);
+    public abstract void setBulletinLevel(LogLevel bulletinLevel);
 
-       public abstract LogLevel getBulletinLevel();
+    public abstract LogLevel getBulletinLevel();
 
-       public abstract Processor getProcessor();
+    public abstract Processor getProcessor();
 
-       public abstract void yield(long period, TimeUnit timeUnit);
+    public abstract void yield(long period, TimeUnit timeUnit);
 
-       public abstract void setAutoTerminatedRelationships(Set<Relationship> 
relationships);
+    public abstract void setAutoTerminatedRelationships(Set<Relationship> 
relationships);
 
-       public abstract Set<Relationship> getAutoTerminatedRelationships();
+    public abstract Set<Relationship> getAutoTerminatedRelationships();
 
-       public abstract void setSchedulingStrategy(SchedulingStrategy 
schedulingStrategy);
+    public abstract void setSchedulingStrategy(SchedulingStrategy 
schedulingStrategy);
 
-       @Override
-       public abstract SchedulingStrategy getSchedulingStrategy();
+    @Override
+    public abstract SchedulingStrategy getSchedulingStrategy();
 
-       public abstract void setRunDuration(long duration, TimeUnit timeUnit);
+    public abstract void setRunDuration(long duration, TimeUnit timeUnit);
 
-       public abstract long getRunDuration(TimeUnit timeUnit);
+    public abstract long getRunDuration(TimeUnit timeUnit);
 
-       public abstract Map<String, String> getStyle();
+    public abstract Map<String, String> getStyle();
 
-       public abstract void setStyle(Map<String, String> style);
+    public abstract void setStyle(Map<String, String> style);
 
-       /**
-        * @return the number of threads (concurrent tasks) currently being 
used by
-        * this Processor
-        */
-       public abstract int getActiveThreadCount();
+    /**
+     * @return the number of threads (concurrent tasks) currently being used by
+     *         this Processor
+     */
+    public abstract int getActiveThreadCount();
 
-       /**
-        * Verifies that this Processor can be started if the provided set of
-        * services are enabled. This is introduced because we need to verify 
that
-        * all components can be started before starting any of them. In order 
to do
-        * that, we need to know that this component can be started if the given
-        * services are enabled, as we will then enable the given services 
before
-        * starting this component.
-        *
-        * @param ignoredReferences to ignore
-        */
-       public abstract void verifyCanStart(Set<ControllerServiceNode> 
ignoredReferences);
+    /**
+     * Verifies that this Processor can be started if the provided set of
+     * services are enabled. This is introduced because we need to verify that
+     * all components can be started before starting any of them. In order to 
do
+     * that, we need to know that this component can be started if the given
+     * services are enabled, as we will then enable the given services before
+     * starting this component.
+     *
+     * @param ignoredReferences to ignore
+     */
+    public abstract void verifyCanStart(Set<ControllerServiceNode> 
ignoredReferences);
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index f69c510..ad22c6d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -1306,9 +1306,9 @@ public class StandardProcessorNode extends ProcessorNode 
implements Connectable
     }
 
     @Override
-       public void verifyModifiable() throws IllegalStateException {
-               if (isRunning()) {
-                       throw new IllegalStateException("Cannot modify 
Processor configuration while the Processor is running");
-               }
-       }>>>>>>>2215 bc848b7db395b2ca9ac7cc4dc10891393721
+    public void verifyModifiable() throws IllegalStateException {
+        if (isRunning()) {
+            throw new IllegalStateException("Cannot modify Processor 
configuration while the Processor is running");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
index 816b407..db45109 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
@@ -56,99 +56,99 @@ import org.apache.nifi.util.StopWatch;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class Base64EncodeContent extends AbstractProcessor {
 
-       public static final String ENCODE_MODE = "Encode";
-       public static final String DECODE_MODE = "Decode";
+    public static final String ENCODE_MODE = "Encode";
+    public static final String DECODE_MODE = "Decode";
 
-       public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
-               .name("Mode")
-               .description("Specifies whether the content should be encoded 
or decoded")
-               .required(true)
-               .allowableValues(ENCODE_MODE, DECODE_MODE)
-               .defaultValue(ENCODE_MODE)
-               .build();
-       public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
-               .name("success")
-               .description("Any FlowFile that is successfully encoded or 
decoded will be routed to success")
-               .build();
-       public static final Relationship REL_FAILURE = new 
Relationship.Builder()
-               .name("failure")
-               .description("Any FlowFile that cannot be encoded or decoded 
will be routed to failure")
-               .build();
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+        .name("Mode")
+        .description("Specifies whether the content should be encoded or 
decoded")
+        .required(true)
+        .allowableValues(ENCODE_MODE, DECODE_MODE)
+        .defaultValue(ENCODE_MODE)
+        .build();
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("Any FlowFile that is successfully encoded or decoded 
will be routed to success")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("Any FlowFile that cannot be encoded or decoded will be 
routed to failure")
+        .build();
 
-       private List<PropertyDescriptor> properties;
-       private Set<Relationship> relationships;
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
 
-       @Override
-       protected void init(final ProcessorInitializationContext context) {
-               final List<PropertyDescriptor> properties = new ArrayList<>();
-               properties.add(MODE);
-               this.properties = Collections.unmodifiableList(properties);
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(MODE);
+        this.properties = Collections.unmodifiableList(properties);
 
-               final Set<Relationship> relationships = new HashSet<>();
-               relationships.add(REL_SUCCESS);
-               relationships.add(REL_FAILURE);
-               this.relationships = Collections.unmodifiableSet(relationships);
-       }
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
 
-       @Override
-       public Set<Relationship> getRelationships() {
-               return relationships;
-       }
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
 
-       @Override
-       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-               return properties;
-       }
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
 
-       @Override
-       public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
-               FlowFile flowFile = session.get();
-               if (flowFile == null) {
-                       return;
-               }
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
 
-               final ProcessorLog logger = getLogger();
+        final ProcessorLog logger = getLogger();
 
-               boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
-               try {
-                       final StopWatch stopWatch = new StopWatch(true);
-                       if (encode) {
-                               flowFile = session.write(flowFile, new 
StreamCallback() {
-                                       @Override
-                                       public void process(InputStream in, 
OutputStream out) throws IOException {
-                                               try (Base64OutputStream bos = 
new Base64OutputStream(out)) {
-                                                       int len = -1;
-                                                       byte[] buf = new 
byte[8192];
-                                                       while ((len = 
in.read(buf)) > 0) {
-                                                               bos.write(buf, 
0, len);
-                                                       }
-                                                       bos.flush();
-                                               }
-                                       }
-                               });
-                       } else {
-                               flowFile = session.write(flowFile, new 
StreamCallback() {
-                                       @Override
-                                       public void process(InputStream in, 
OutputStream out) throws IOException {
-                                               try (Base64InputStream bis = 
new Base64InputStream(new ValidatingBase64InputStream(in))) {
-                                                       int len = -1;
-                                                       byte[] buf = new 
byte[8192];
-                                                       while ((len = 
bis.read(buf)) > 0) {
-                                                               out.write(buf, 
0, len);
-                                                       }
-                                                       out.flush();
-                                               }
-                                       }
-                               });
-                       }
+        boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
+        try {
+            final StopWatch stopWatch = new StopWatch(true);
+            if (encode) {
+                flowFile = session.write(flowFile, new StreamCallback() {
+                    @Override
+                    public void process(InputStream in, OutputStream out) 
throws IOException {
+                        try (Base64OutputStream bos = new 
Base64OutputStream(out)) {
+                            int len = -1;
+                            byte[] buf = new byte[8192];
+                            while ((len = in.read(buf)) > 0) {
+                                bos.write(buf, 0, len);
+                            }
+                            bos.flush();
+                        }
+                    }
+                });
+            } else {
+                flowFile = session.write(flowFile, new StreamCallback() {
+                    @Override
+                    public void process(InputStream in, OutputStream out) 
throws IOException {
+                        try (Base64InputStream bis = new Base64InputStream(new 
ValidatingBase64InputStream(in))) {
+                            int len = -1;
+                            byte[] buf = new byte[8192];
+                            while ((len = bis.read(buf)) > 0) {
+                                out.write(buf, 0, len);
+                            }
+                            out.flush();
+                        }
+                    }
+                });
+            }
 
-                       logger.info("Successfully {} {}", new Object[]{encode ? 
"encoded" : "decoded", flowFile});
-                       session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-                       session.transfer(flowFile, REL_SUCCESS);
-               } catch (ProcessException e) {
-                       logger.error("Failed to {} {} due to {}", new 
Object[]{encode ? "encode" : "decode", flowFile, e});
-                       session.transfer(flowFile, REL_FAILURE);
-               }
-       }
+            logger.info("Successfully {} {}", new Object[] {encode ? "encoded" 
: "decoded", flowFile});
+            session.getProvenanceReporter().modifyContent(flowFile, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (ProcessException e) {
+            logger.error("Failed to {} {} due to {}", new Object[] {encode ? 
"encode" : "decode", flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ccfb57fe/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
index a45c211..0847472 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java
@@ -61,340 +61,340 @@ import org.apache.nifi.util.timebuffer.TimedBuffer;
 @CapabilityDescription("Controls the rate at which data is transferred to 
follow-on processors.")
 public class ControlRate extends AbstractProcessor {
 
-       public static final String DATA_RATE = "data rate";
-       public static final String FLOWFILE_RATE = "flowfile count";
-       public static final String ATTRIBUTE_RATE = "attribute value";
-
-       public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new 
PropertyDescriptor.Builder()
-               .name("Rate Control Criteria")
-               .description("Indicates the criteria that is used to control 
the throughput rate. Changing this value resets the rate counters.")
-               .required(true)
-               .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
-               .defaultValue(DATA_RATE)
-               .build();
-       public static final PropertyDescriptor MAX_RATE = new 
PropertyDescriptor.Builder()
-               .name("Maximum Rate")
-               .description("The maximum rate at which data should pass 
through this processor. The format of this property is expected to be a "
-                       + "positive integer, or a Data Size (such as '1 MB') if 
Rate Control Criteria is set to 'data rate'.")
-               .required(true)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // 
validated in customValidate b/c dependent on Rate Control Criteria
-               .build();
-       public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = 
new PropertyDescriptor.Builder()
-               .name("Rate Controlled Attribute")
-               .description("The name of an attribute whose values build 
toward the rate limit if Rate Control Criteria is set to 'attribute value'. "
-                       + "The value of the attribute referenced by this 
property must be a positive long, or the FlowFile will be routed to failure. "
-                       + "This value is ignored if Rate Control Criteria is 
not set to 'attribute value'. Changing this value resets the rate counters.")
-               .required(false)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-               .expressionLanguageSupported(false)
-               .build();
-       public static final PropertyDescriptor TIME_PERIOD = new 
PropertyDescriptor.Builder()
-               .name("Time Duration")
-               .description("The amount of time to which the Maximum Data Size 
and Maximum Number of Files pertains. Changing this value resets the rate 
counters.")
-               .required(true)
-               .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
-               .defaultValue("1 min")
-               .build();
-       public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
-               .name("Grouping Attribute")
-               .description("By default, a single \"throttle\" is used for all 
FlowFiles. If this value is specified, a separate throttle is used for "
-                       + "each value specified by the attribute with this 
name. Changing this value resets the rate counters.")
-               .required(false)
-               .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-               .expressionLanguageSupported(false)
-               .build();
-
-       public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
-               .name("success")
-               .description("All FlowFiles are transferred to this 
relationship")
-               .build();
-       public static final Relationship REL_FAILURE = new 
Relationship.Builder()
-               .name("failure")
-               .description("FlowFiles will be routed to this relationship if 
they are missing a necessary attribute or the attribute is not in the expected 
format")
-               .build();
-
-       private static final Pattern POSITIVE_LONG_PATTERN = 
Pattern.compile("0*[1-9][0-9]*");
-       private static final String DEFAULT_GROUP_ATTRIBUTE = 
ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
-
-       private final ConcurrentMap<String, Throttle> throttleMap = new 
ConcurrentHashMap<>();
-       private List<PropertyDescriptor> properties;
-       private Set<Relationship> relationships;
-       private final AtomicLong lastThrottleClearTime = new 
AtomicLong(System.currentTimeMillis());
-
-       @Override
-       protected void init(final ProcessorInitializationContext context) {
-               final List<PropertyDescriptor> properties = new ArrayList<>();
-               properties.add(RATE_CONTROL_CRITERIA);
-               properties.add(MAX_RATE);
-               properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
-               properties.add(TIME_PERIOD);
-               properties.add(GROUPING_ATTRIBUTE_NAME);
-               this.properties = Collections.unmodifiableList(properties);
-
-               final Set<Relationship> relationships = new HashSet<>();
-               relationships.add(REL_SUCCESS);
-               this.relationships = Collections.unmodifiableSet(relationships);
-       }
-
-       @Override
-       protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-               return properties;
-       }
-
-       @Override
-       public Set<Relationship> getRelationships() {
-               return relationships;
-       }
-
-       @Override
-       protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
-               final List<ValidationResult> validationResults = new 
ArrayList<>(super.customValidate(context));
-
-               final Validator rateValidator;
-               switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
-                       case DATA_RATE:
-                               rateValidator = 
StandardValidators.DATA_SIZE_VALIDATOR;
-                               break;
-                       case ATTRIBUTE_RATE:
-                               rateValidator = 
StandardValidators.POSITIVE_LONG_VALIDATOR;
-                               final String rateAttr = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-                               if (rateAttr == null) {
-                                       validationResults.add(new 
ValidationResult.Builder()
-                                               
.subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
-                                               .explanation("<Rate Controlled 
Attribute> property must be set if using <Rate Control Criteria> of 'attribute 
value'")
-                                               .build());
-                               }
-                               break;
-                       case FLOWFILE_RATE:
-                       default:
-                               rateValidator = 
StandardValidators.POSITIVE_LONG_VALIDATOR;
-                               break;
-               }
-
-               final ValidationResult rateResult = 
rateValidator.validate("Maximum Rate", 
context.getProperty(MAX_RATE).getValue(), context);
-               if (!rateResult.isValid()) {
-                       validationResults.add(rateResult);
-               }
-
-               return validationResults;
-       }
-
-       @Override
-       public void onPropertyModified(final PropertyDescriptor descriptor, 
final String oldValue, final String newValue) {
-               super.onPropertyModified(descriptor, oldValue, newValue);
-
-               if (descriptor.equals(RATE_CONTROL_CRITERIA)
-                       || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
-                       || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
-                       || descriptor.equals(TIME_PERIOD)) {
-                       // if the criteria that is being used to determine 
limits/throttles is changed, we must clear our throttle map.
-                       throttleMap.clear();
-               } else if (descriptor.equals(MAX_RATE)) {
-                       final long newRate;
-                       if 
(DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
-                               newRate = DataUnit.parseDataSize(newValue, 
DataUnit.B).longValue();
-                       } else {
-                               newRate = Long.parseLong(newValue);
-                       }
-
-                       for (final Throttle throttle : throttleMap.values()) {
-                               throttle.setMaxRate(newRate);
-                       }
-               }
-       }
-
-       @Override
-       public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
-               final long lastClearTime = lastThrottleClearTime.get();
-               final long throttleExpirationMillis = 
System.currentTimeMillis() - 2 * 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
-               if (lastClearTime < throttleExpirationMillis) {
-                       if (lastThrottleClearTime.compareAndSet(lastClearTime, 
System.currentTimeMillis())) {
-                               final Iterator<Map.Entry<String, Throttle>> itr 
= throttleMap.entrySet().iterator();
-                               while (itr.hasNext()) {
-                                       final Map.Entry<String, Throttle> entry 
= itr.next();
-                                       final Throttle throttle = 
entry.getValue();
-                                       if (throttle.tryLock()) {
-                                               try {
-                                                       if 
(throttle.lastUpdateTime() < lastClearTime) {
-                                                               itr.remove();
-                                                       }
-                                               } finally {
-                                                       throttle.unlock();
-                                               }
-                                       }
-                               }
-                       }
-               }
-
-               // TODO: Should periodically clear any Throttle that has not 
been used in more than 2 throttling periods
-               FlowFile flowFile = session.get();
-               if (flowFile == null) {
-                       return;
-               }
-
-               final ProcessorLog logger = getLogger();
-               final long seconds = 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
-               final String rateControlAttributeName = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
-               long rateValue;
-               switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
-                       case DATA_RATE:
-                               rateValue = flowFile.getSize();
-                               break;
-                       case FLOWFILE_RATE:
-                               rateValue = 1;
-                               break;
-                       case ATTRIBUTE_RATE:
-                               final String attributeValue = 
flowFile.getAttribute(rateControlAttributeName);
-                               if (attributeValue == null) {
-                                       logger.error("routing {} to 'failure' 
because FlowFile is missing required attribute {}", new Object[]{flowFile, 
rateControlAttributeName});
-                                       session.transfer(flowFile, REL_FAILURE);
-                                       return;
-                               }
-
-                               if 
(!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
-                                       logger.error("routing {} to 'failure' 
because FlowFile attribute {} has a value of {}, which is not a positive long",
-                                               new Object[]{flowFile, 
rateControlAttributeName, attributeValue});
-                                       session.transfer(flowFile, REL_FAILURE);
-                                       return;
-                               }
-                               rateValue = Long.parseLong(attributeValue);
-                               break;
-                       default:
-                               throw new AssertionError("<Rate Control 
Criteria> property set to illegal value of " + 
context.getProperty(RATE_CONTROL_CRITERIA).getValue());
-               }
-
-               final String groupingAttributeName = 
context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
-               final String groupName = (groupingAttributeName == null) ? 
DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
-               Throttle throttle = throttleMap.get(groupName);
-               if (throttle == null) {
-                       throttle = new Throttle((int) seconds, 
TimeUnit.SECONDS, logger);
-
-                       final String maxRateValue = 
context.getProperty(MAX_RATE).getValue();
-                       final long newRate;
-                       if 
(DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
-                               newRate = DataUnit.parseDataSize(maxRateValue, 
DataUnit.B).longValue();
-                       } else {
-                               newRate = Long.parseLong(maxRateValue);
-                       }
-                       throttle.setMaxRate(newRate);
-
-                       throttleMap.put(groupName, throttle);
-               }
-
-               throttle.lock();
-               try {
-                       if (throttle.tryAdd(rateValue)) {
-                               logger.info("transferring {} to 'success'", new 
Object[]{flowFile});
-                               session.transfer(flowFile, REL_SUCCESS);
-                       } else {
-                               flowFile = session.penalize(flowFile);
-                               session.transfer(flowFile);
-                       }
-               } finally {
-                       throttle.unlock();
-               }
-       }
-
-       private static class TimestampedLong {
-
-               private final Long value;
-               private final long timestamp = System.currentTimeMillis();
-
-               public TimestampedLong(final Long value) {
-                       this.value = value;
-               }
-
-               public Long getValue() {
-                       return value;
-               }
-
-               public long getTimestamp() {
-                       return timestamp;
-               }
-       }
-
-       private static class RateEntityAccess implements 
EntityAccess<TimestampedLong> {
-
-               @Override
-               public TimestampedLong aggregate(TimestampedLong oldValue, 
TimestampedLong toAdd) {
-                       if (oldValue == null && toAdd == null) {
-                               return new TimestampedLong(0L);
-                       } else if (oldValue == null) {
-                               return toAdd;
-                       } else if (toAdd == null) {
-                               return oldValue;
-                       }
-
-                       return new TimestampedLong(oldValue.getValue() + 
toAdd.getValue());
-               }
-
-               @Override
-               public TimestampedLong createNew() {
-                       return new TimestampedLong(0L);
-               }
-
-               @Override
-               public long getTimestamp(TimestampedLong entity) {
-                       return entity == null ? 0L : entity.getTimestamp();
-               }
-       }
-
-       private static class Throttle extends ReentrantLock {
-
-               private final AtomicLong maxRate = new AtomicLong(1L);
-               private final long timePeriodValue;
-               private final TimeUnit timePeriodUnit;
-               private final TimedBuffer<TimestampedLong> timedBuffer;
-               private final ProcessorLog logger;
-
-               private volatile long penalizationExpired;
-               private volatile long lastUpdateTime;
-
-               public Throttle(final int timePeriod, final TimeUnit unit, 
final ProcessorLog logger) {
-                       this.timePeriodUnit = unit;
-                       this.timePeriodValue = timePeriod;
-                       this.timedBuffer = new TimedBuffer<>(unit, timePeriod, 
new RateEntityAccess());
-                       this.logger = logger;
-               }
-
-               public void setMaxRate(final long maxRate) {
-                       this.maxRate.set(maxRate);
-               }
-
-               public long lastUpdateTime() {
-                       return lastUpdateTime;
-               }
-
-               public boolean tryAdd(final long value) {
-                       final long now = System.currentTimeMillis();
-                       if (penalizationExpired > now) {
-                               return false;
-                       }
-
-                       final long maxRateValue = maxRate.get();
-
-                       final TimestampedLong sum = 
timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, 
timePeriodUnit));
-                       if (sum != null && sum.getValue() >= maxRateValue) {
-                               logger.debug("current sum for throttle is {}, 
so not allowing rate of {} through", new Object[]{sum.getValue(), value});
-                               return false;
-                       }
-
-                       logger.debug("current sum for throttle is {}, so 
allowing rate of {} through",
-                               new Object[]{sum == null ? 0 : sum.getValue(), 
value});
-
-                       final long transferred = timedBuffer.add(new 
TimestampedLong(value)).getValue();
-                       if (transferred > maxRateValue) {
-                               final long amountOver = transferred - 
maxRateValue;
-                               // determine how long it should take to 
transfer 'amountOver' and 'penalize' the Throttle for that long
-                               final long milliDuration = 
TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
-                               final double pct = (double) amountOver / 
(double) maxRateValue;
-                               final long penalizationPeriod = (long) 
(milliDuration * pct);
-                               this.penalizationExpired = now + 
penalizationPeriod;
-                               logger.debug("allowing rate of {} through but 
penalizing Throttle for {} milliseconds", new Object[]{value, 
penalizationPeriod});
-                       }
-
-                       lastUpdateTime = now;
-                       return true;
-               }
-       }
+    public static final String DATA_RATE = "data rate";
+    public static final String FLOWFILE_RATE = "flowfile count";
+    public static final String ATTRIBUTE_RATE = "attribute value";
+
+    public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new 
PropertyDescriptor.Builder()
+        .name("Rate Control Criteria")
+        .description("Indicates the criteria that is used to control the 
throughput rate. Changing this value resets the rate counters.")
+        .required(true)
+        .allowableValues(DATA_RATE, FLOWFILE_RATE, ATTRIBUTE_RATE)
+        .defaultValue(DATA_RATE)
+        .build();
+    public static final PropertyDescriptor MAX_RATE = new 
PropertyDescriptor.Builder()
+        .name("Maximum Rate")
+        .description("The maximum rate at which data should pass through this 
processor. The format of this property is expected to be a "
+            + "positive integer, or a Data Size (such as '1 MB') if Rate 
Control Criteria is set to 'data rate'.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) // validated in 
customValidate b/c dependent on Rate Control Criteria
+        .build();
+    public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+        .name("Rate Controlled Attribute")
+        .description("The name of an attribute whose values build toward the 
rate limit if Rate Control Criteria is set to 'attribute value'. "
+            + "The value of the attribute referenced by this property must be 
a positive long, or the FlowFile will be routed to failure. "
+            + "This value is ignored if Rate Control Criteria is not set to 
'attribute value'. Changing this value resets the rate counters.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+    public static final PropertyDescriptor TIME_PERIOD = new 
PropertyDescriptor.Builder()
+        .name("Time Duration")
+        .description("The amount of time to which the Maximum Data Size and 
Maximum Number of Files pertains. Changing this value resets the rate 
counters.")
+        .required(true)
+        .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+        .defaultValue("1 min")
+        .build();
+    public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+        .name("Grouping Attribute")
+        .description("By default, a single \"throttle\" is used for all 
FlowFiles. If this value is specified, a separate throttle is used for "
+            + "each value specified by the attribute with this name. Changing 
this value resets the rate counters.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles are transferred to this relationship")
+        .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("FlowFiles will be routed to this relationship if they 
are missing a necessary attribute or the attribute is not in the expected 
format")
+        .build();
+
+    private static final Pattern POSITIVE_LONG_PATTERN = 
Pattern.compile("0*[1-9][0-9]*");
+    private static final String DEFAULT_GROUP_ATTRIBUTE = 
ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
+
+    private final ConcurrentMap<String, Throttle> throttleMap = new 
ConcurrentHashMap<>();
+    private List<PropertyDescriptor> properties;
+    private Set<Relationship> relationships;
+    private final AtomicLong lastThrottleClearTime = new 
AtomicLong(System.currentTimeMillis());
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RATE_CONTROL_CRITERIA);
+        properties.add(MAX_RATE);
+        properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
+        properties.add(TIME_PERIOD);
+        properties.add(GROUPING_ATTRIBUTE_NAME);
+        this.properties = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
+        final List<ValidationResult> validationResults = new 
ArrayList<>(super.customValidate(context));
+
+        final Validator rateValidator;
+        switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+            case DATA_RATE:
+                rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
+                break;
+            case ATTRIBUTE_RATE:
+                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+                final String rateAttr = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+                if (rateAttr == null) {
+                    validationResults.add(new ValidationResult.Builder()
+                        .subject(RATE_CONTROL_ATTRIBUTE_NAME.getName())
+                        .explanation("<Rate Controlled Attribute> property 
must be set if using <Rate Control Criteria> of 'attribute value'")
+                        .build());
+                }
+                break;
+            case FLOWFILE_RATE:
+            default:
+                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
+                break;
+        }
+
+        final ValidationResult rateResult = rateValidator.validate("Maximum 
Rate", context.getProperty(MAX_RATE).getValue(), context);
+        if (!rateResult.isValid()) {
+            validationResults.add(rateResult);
+        }
+
+        return validationResults;
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        super.onPropertyModified(descriptor, oldValue, newValue);
+
+        if (descriptor.equals(RATE_CONTROL_CRITERIA)
+            || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME)
+            || descriptor.equals(GROUPING_ATTRIBUTE_NAME)
+            || descriptor.equals(TIME_PERIOD)) {
+            // if the criteria that is being used to determine 
limits/throttles is changed, we must clear our throttle map.
+            throttleMap.clear();
+        } else if (descriptor.equals(MAX_RATE)) {
+            final long newRate;
+            if (DataUnit.DATA_SIZE_PATTERN.matcher(newValue).matches()) {
+                newRate = DataUnit.parseDataSize(newValue, 
DataUnit.B).longValue();
+            } else {
+                newRate = Long.parseLong(newValue);
+            }
+
+            for (final Throttle throttle : throttleMap.values()) {
+                throttle.setMaxRate(newRate);
+            }
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final long lastClearTime = lastThrottleClearTime.get();
+        final long throttleExpirationMillis = System.currentTimeMillis() - 2 * 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+        if (lastClearTime < throttleExpirationMillis) {
+            if (lastThrottleClearTime.compareAndSet(lastClearTime, 
System.currentTimeMillis())) {
+                final Iterator<Map.Entry<String, Throttle>> itr = 
throttleMap.entrySet().iterator();
+                while (itr.hasNext()) {
+                    final Map.Entry<String, Throttle> entry = itr.next();
+                    final Throttle throttle = entry.getValue();
+                    if (throttle.tryLock()) {
+                        try {
+                            if (throttle.lastUpdateTime() < lastClearTime) {
+                                itr.remove();
+                            }
+                        } finally {
+                            throttle.unlock();
+                        }
+                    }
+                }
+            }
+        }
+
+        // TODO: Should periodically clear any Throttle that has not been used 
in more than 2 throttling periods
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final ProcessorLog logger = getLogger();
+        final long seconds = 
context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+        final String rateControlAttributeName = 
context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
+        long rateValue;
+        switch 
(context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
+            case DATA_RATE:
+                rateValue = flowFile.getSize();
+                break;
+            case FLOWFILE_RATE:
+                rateValue = 1;
+                break;
+            case ATTRIBUTE_RATE:
+                final String attributeValue = 
flowFile.getAttribute(rateControlAttributeName);
+                if (attributeValue == null) {
+                    logger.error("routing {} to 'failure' because FlowFile is 
missing required attribute {}", new Object[] {flowFile, 
rateControlAttributeName});
+                    session.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
+
+                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
+                    logger.error("routing {} to 'failure' because FlowFile 
attribute {} has a value of {}, which is not a positive long",
+                        new Object[] {flowFile, rateControlAttributeName, 
attributeValue});
+                    session.transfer(flowFile, REL_FAILURE);
+                    return;
+                }
+                rateValue = Long.parseLong(attributeValue);
+                break;
+            default:
+                throw new AssertionError("<Rate Control Criteria> property set 
to illegal value of " + context.getProperty(RATE_CONTROL_CRITERIA).getValue());
+        }
+
+        final String groupingAttributeName = 
context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
+        final String groupName = (groupingAttributeName == null) ? 
DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName);
+        Throttle throttle = throttleMap.get(groupName);
+        if (throttle == null) {
+            throttle = new Throttle((int) seconds, TimeUnit.SECONDS, logger);
+
+            final String maxRateValue = 
context.getProperty(MAX_RATE).getValue();
+            final long newRate;
+            if (DataUnit.DATA_SIZE_PATTERN.matcher(maxRateValue).matches()) {
+                newRate = DataUnit.parseDataSize(maxRateValue, 
DataUnit.B).longValue();
+            } else {
+                newRate = Long.parseLong(maxRateValue);
+            }
+            throttle.setMaxRate(newRate);
+
+            throttleMap.put(groupName, throttle);
+        }
+
+        throttle.lock();
+        try {
+            if (throttle.tryAdd(rateValue)) {
+                logger.info("transferring {} to 'success'", new Object[] 
{flowFile});
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile);
+            }
+        } finally {
+            throttle.unlock();
+        }
+    }
+
+    private static class TimestampedLong {
+
+        private final Long value;
+        private final long timestamp = System.currentTimeMillis();
+
+        public TimestampedLong(final Long value) {
+            this.value = value;
+        }
+
+        public Long getValue() {
+            return value;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    private static class RateEntityAccess implements 
EntityAccess<TimestampedLong> {
+
+        @Override
+        public TimestampedLong aggregate(TimestampedLong oldValue, 
TimestampedLong toAdd) {
+            if (oldValue == null && toAdd == null) {
+                return new TimestampedLong(0L);
+            } else if (oldValue == null) {
+                return toAdd;
+            } else if (toAdd == null) {
+                return oldValue;
+            }
+
+            return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+        }
+
+        @Override
+        public TimestampedLong createNew() {
+            return new TimestampedLong(0L);
+        }
+
+        @Override
+        public long getTimestamp(TimestampedLong entity) {
+            return entity == null ? 0L : entity.getTimestamp();
+        }
+    }
+
+    private static class Throttle extends ReentrantLock {
+
+        private final AtomicLong maxRate = new AtomicLong(1L);
+        private final long timePeriodValue;
+        private final TimeUnit timePeriodUnit;
+        private final TimedBuffer<TimestampedLong> timedBuffer;
+        private final ProcessorLog logger;
+
+        private volatile long penalizationExpired;
+        private volatile long lastUpdateTime;
+
+        public Throttle(final int timePeriod, final TimeUnit unit, final 
ProcessorLog logger) {
+            this.timePeriodUnit = unit;
+            this.timePeriodValue = timePeriod;
+            this.timedBuffer = new TimedBuffer<>(unit, timePeriod, new 
RateEntityAccess());
+            this.logger = logger;
+        }
+
+        public void setMaxRate(final long maxRate) {
+            this.maxRate.set(maxRate);
+        }
+
+        public long lastUpdateTime() {
+            return lastUpdateTime;
+        }
+
+        public boolean tryAdd(final long value) {
+            final long now = System.currentTimeMillis();
+            if (penalizationExpired > now) {
+                return false;
+            }
+
+            final long maxRateValue = maxRate.get();
+
+            final TimestampedLong sum = 
timedBuffer.getAggregateValue(TimeUnit.MILLISECONDS.convert(timePeriodValue, 
timePeriodUnit));
+            if (sum != null && sum.getValue() >= maxRateValue) {
+                logger.debug("current sum for throttle is {}, so not allowing 
rate of {} through", new Object[] {sum.getValue(), value});
+                return false;
+            }
+
+            logger.debug("current sum for throttle is {}, so allowing rate of 
{} through",
+                new Object[] {sum == null ? 0 : sum.getValue(), value});
+
+            final long transferred = timedBuffer.add(new 
TimestampedLong(value)).getValue();
+            if (transferred > maxRateValue) {
+                final long amountOver = transferred - maxRateValue;
+                // determine how long it should take to transfer 'amountOver' 
and 'penalize' the Throttle for that long
+                final long milliDuration = 
TimeUnit.MILLISECONDS.convert(timePeriodValue, timePeriodUnit);
+                final double pct = (double) amountOver / (double) maxRateValue;
+                final long penalizationPeriod = (long) (milliDuration * pct);
+                this.penalizationExpired = now + penalizationPeriod;
+                logger.debug("allowing rate of {} through but penalizing 
Throttle for {} milliseconds", new Object[] {value, penalizationPeriod});
+            }
+
+            lastUpdateTime = now;
+            return true;
+        }
+    }
 }

Reply via email to