http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
new file mode 100644
index 0000000..6d36ad0
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.search.ahocorasick;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.search.SearchTerm;
+
+public class SearchState<T> {
+
+    private Node currentNode;
+    private final Map<SearchTerm<T>, List<Long>> resultMap;
+    private long bytesRead;
+
+    SearchState(final Node rootNode) {
+        resultMap = new HashMap<>(5);
+        currentNode = rootNode;
+        bytesRead = 0L;
+    }
+
+    void incrementBytesRead(final long increment) {
+        bytesRead += increment;
+    }
+
+    void setCurrentNode(final Node curr) {
+        currentNode = curr;
+    }
+
+    public Node getCurrentNode() {
+        return currentNode;
+    }
+
+    public Map<SearchTerm<T>, List<Long>> getResults() {
+        return new HashMap<>(resultMap);
+    }
+
+    void addResult(final SearchTerm matchingTerm) {
+        final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? 
resultMap.get(matchingTerm) : new ArrayList<Long>(5);
+        indexes.add(bytesRead);
+        resultMap.put(matchingTerm, indexes);
+    }
+
+    public boolean foundMatch() {
+        return !resultMap.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
new file mode 100644
index 0000000..2b95897
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/EntityAccess.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public interface EntityAccess<T> {
+
+    T aggregate(T oldValue, T toAdd);
+
+    T createNew();
+
+    long getTimestamp(T entity);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
new file mode 100644
index 0000000..193abc6
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/LongEntityAccess.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public class LongEntityAccess implements EntityAccess<TimestampedLong> {
+
+    @Override
+    public TimestampedLong aggregate(TimestampedLong oldValue, TimestampedLong 
toAdd) {
+        if (oldValue == null && toAdd == null) {
+            return new TimestampedLong(0L);
+        } else if (oldValue == null) {
+            return toAdd;
+        } else if (toAdd == null) {
+            return oldValue;
+        }
+
+        return new TimestampedLong(oldValue.getValue() + toAdd.getValue());
+    }
+
+    @Override
+    public TimestampedLong createNew() {
+        return new TimestampedLong(0L);
+    }
+
+    @Override
+    public long getTimestamp(TimestampedLong entity) {
+        return entity == null ? 0L : entity.getTimestamp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
new file mode 100644
index 0000000..dd8e523
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimedBuffer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TimedBuffer<T> {
+
+    private final int numBins;
+    private final EntitySum<T>[] bins;
+    private final EntityAccess<T> entityAccess;
+    private final TimeUnit binPrecision;
+
+    @SuppressWarnings("unchecked")
+    public TimedBuffer(final TimeUnit binPrecision, final int numBins, final 
EntityAccess<T> accessor) {
+        this.binPrecision = binPrecision;
+        this.numBins = numBins + 1;
+        this.bins = new EntitySum[this.numBins];
+        for (int i = 0; i < this.numBins; i++) {
+            this.bins[i] = new EntitySum<>(binPrecision, numBins, accessor);
+        }
+        this.entityAccess = accessor;
+    }
+
+    public T add(final T entity) {
+        final int binIdx = (int) 
(binPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) % 
numBins);
+        final EntitySum<T> sum = bins[binIdx];
+
+        return sum.addOrReset(entity);
+    }
+
+    public T getAggregateValue(final long sinceEpochMillis) {
+        final int startBinIdx = (int) (binPrecision.convert(sinceEpochMillis, 
TimeUnit.MILLISECONDS) % numBins);
+
+        T total = null;
+        for (int i = 0; i < numBins; i++) {
+            int binIdx = (startBinIdx + i) % numBins;
+            final EntitySum<T> bin = bins[binIdx];
+
+            if (!bin.isExpired()) {
+                total = entityAccess.aggregate(total, bin.getValue());
+            }
+        }
+
+        return total;
+    }
+
+    private static class EntitySum<S> {
+
+        private final EntityAccess<S> entityAccess;
+        private final AtomicReference<S> ref = new AtomicReference<>();
+        private final TimeUnit binPrecision;
+        private final int numConfiguredBins;
+
+        public EntitySum(final TimeUnit binPrecision, final int 
numConfiguredBins, final EntityAccess<S> aggregator) {
+            this.binPrecision = binPrecision;
+            this.entityAccess = aggregator;
+            this.numConfiguredBins = numConfiguredBins;
+        }
+
+        private S add(final S event) {
+            S newValue;
+            S value;
+            do {
+                value = ref.get();
+                newValue = entityAccess.aggregate(value, event);
+            } while (!ref.compareAndSet(value, newValue));
+
+            return newValue;
+        }
+
+        public S getValue() {
+            return ref.get();
+        }
+
+        public boolean isExpired() {
+            // entityAccess.getTimestamp(curValue) represents the time at 
which the current value
+            // was last updated. If the last value is less than current time - 
1 binPrecision, then it
+            // means that we've rolled over and need to reset the value.
+            final long maxExpectedTimePeriod = System.currentTimeMillis() - 
TimeUnit.MILLISECONDS.convert(numConfiguredBins, binPrecision);
+
+            final S curValue = ref.get();
+            return (entityAccess.getTimestamp(curValue) < 
maxExpectedTimePeriod);
+        }
+
+        public S addOrReset(final S event) {
+            // entityAccess.getTimestamp(curValue) represents the time at 
which the current value
+            // was last updated. If the last value is less than current time - 
1 binPrecision, then it
+            // means that we've rolled over and need to reset the value.
+            final long maxExpectedTimePeriod = System.currentTimeMillis() - 
TimeUnit.MILLISECONDS.convert(1, binPrecision);
+
+            final S curValue = ref.get();
+            if (entityAccess.getTimestamp(curValue) < maxExpectedTimePeriod) {
+                ref.compareAndSet(curValue, entityAccess.createNew());
+            }
+            return add(event);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
new file mode 100644
index 0000000..07d31ea
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/timebuffer/TimestampedLong.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+public class TimestampedLong {
+
+    private final Long value;
+    private final long timestamp = System.currentTimeMillis();
+
+    public TimestampedLong(final Long value) {
+        this.value = value;
+    }
+
+    public Long getValue() {
+        return value;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
new file mode 100644
index 0000000..bd30a96
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+
+import org.junit.Test;
+
+public class TestCompressionInputOutputStreams {
+
+    @Test
+    public void testSimple() throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] data = "Hello, World!".getBytes("UTF-8");
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos);
+        cos.write(data);
+        cos.flush();
+        cos.close();
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new 
ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data, decompressed));
+    }
+
+    @Test
+    public void testDataLargerThanBuffer() throws IOException {
+        final String str = "The quick brown fox jumps over the lazy 
dog\r\n\n\n\r";
+
+        final StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 100; i++) {
+            sb.append(str);
+        }
+        final byte[] data = sb.toString().getBytes("UTF-8");
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 
8192);
+        cos.write(data);
+        cos.flush();
+        cos.close();
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new 
ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data, decompressed));
+    }
+
+    @Test
+    public void testDataLargerThanBufferWhileFlushing() throws IOException {
+        final String str = "The quick brown fox jumps over the lazy 
dog\r\n\n\n\r";
+        final byte[] data = str.getBytes("UTF-8");
+
+        final StringBuilder sb = new StringBuilder();
+        final byte[] data1024;
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 
8192);
+        for (int i = 0; i < 1024; i++) {
+            cos.write(data);
+            cos.flush();
+            sb.append(str);
+        }
+        cos.close();
+        data1024 = sb.toString().getBytes("UTF-8");
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final CompressionInputStream cis = new CompressionInputStream(new 
ByteArrayInputStream(compressedBytes));
+        final byte[] decompressed = readFully(cis);
+
+        assertTrue(Arrays.equals(data1024, decompressed));
+    }
+
+    @Test
+    public void testSendingMultipleFilesBackToBackOnSameStream() throws 
IOException {
+        final String str = "The quick brown fox jumps over the lazy 
dog\r\n\n\n\r";
+        final byte[] data = str.getBytes("UTF-8");
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final CompressionOutputStream cos = new CompressionOutputStream(baos, 
8192);
+        for (int i = 0; i < 512; i++) {
+            cos.write(data);
+            cos.flush();
+        }
+        cos.close();
+
+        final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 
8192);
+        for (int i = 0; i < 512; i++) {
+            cos2.write(data);
+            cos2.flush();
+        }
+        cos2.close();
+
+        final byte[] data512;
+        final StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 512; i++) {
+            sb.append(str);
+        }
+        data512 = sb.toString().getBytes("UTF-8");
+
+        final byte[] compressedBytes = baos.toByteArray();
+        final ByteArrayInputStream bais = new 
ByteArrayInputStream(compressedBytes);
+
+        final CompressionInputStream cis = new CompressionInputStream(bais);
+        final byte[] decompressed = readFully(cis);
+        assertTrue(Arrays.equals(data512, decompressed));
+
+        final CompressionInputStream cis2 = new CompressionInputStream(bais);
+        final byte[] decompressed2 = readFully(cis2);
+        assertTrue(Arrays.equals(data512, decompressed2));
+    }
+
+    private byte[] readFully(final InputStream in) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] buffer = new byte[65536];
+        int len;
+        while ((len = in.read(buffer)) >= 0) {
+            baos.write(buffer, 0, len);
+        }
+
+        return baos.toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingInputStreamTest.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingInputStreamTest.java
new file mode 100644
index 0000000..27b1493
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/ByteCountingInputStreamTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import junit.framework.TestCase;
+
+public class ByteCountingInputStreamTest extends TestCase {
+
+    final ByteArrayInputStream reader = new 
ByteArrayInputStream("abcdefghijklmnopqrstuvwxyz".getBytes());
+
+    public void testReset() throws Exception {
+
+        final ByteArrayInputStream reader = new 
ByteArrayInputStream("abcdefghijklmnopqrstuvwxyz".getBytes());
+        final ByteCountingInputStream bcis = new 
ByteCountingInputStream(reader);
+        int tmp;
+
+        /* verify first 2 bytes */
+        tmp = bcis.read();
+        assertEquals(tmp, 97);
+        tmp = bcis.read();
+        assertEquals(tmp, 98);
+
+        /* save bytes read and place mark */
+        final long bytesAtMark = bcis.getBytesRead();
+        bcis.mark(0);
+
+        /* verify next 2 bytes */
+        tmp = bcis.read();
+        assertEquals(tmp, 99);
+        tmp = bcis.read();
+        assertEquals(tmp, 100);
+
+        /* verify reset returns to position when mark was placed */
+        bcis.reset();
+        assertEquals(bytesAtMark, bcis.getBytesRead());
+
+        /* verify that the reset bug has been fixed (bug would reduce bytes 
read count) */
+        bcis.reset();
+        assertEquals(bytesAtMark, bcis.getBytesRead());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
new file mode 100644
index 0000000..52bd8de
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/TestLeakyBucketThrottler.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import org.apache.nifi.stream.io.ByteArrayInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.LeakyBucketStreamThrottler;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Tests are time-based")
+public class TestLeakyBucketThrottler {
+
+    @Test(timeout = 10000)
+    public void testOutputStreamInterface() throws IOException {
+        // throttle rate at 1 MB/sec
+        final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
+
+        final byte[] data = new byte[1024 * 1024 * 4];
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final OutputStream throttledOut = 
throttler.newThrottledOutputStream(baos);
+
+        final long start = System.currentTimeMillis();
+        throttledOut.write(data);
+        throttler.close();
+        final long millis = System.currentTimeMillis() - start;
+        // should take 4 sec give or take
+        assertTrue(millis > 3000);
+        assertTrue(millis < 6000);
+    }
+
+    @Test(timeout = 10000)
+    public void testInputStreamInterface() throws IOException {
+        // throttle rate at 1 MB/sec
+        final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
+
+        final byte[] data = new byte[1024 * 1024 * 4];
+        final ByteArrayInputStream bais = new ByteArrayInputStream(data);
+        final InputStream throttledIn = 
throttler.newThrottledInputStream(bais);
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final byte[] buffer = new byte[4096];
+        final long start = System.currentTimeMillis();
+        int len;
+        while ((len = throttledIn.read(buffer)) > 0) {
+            baos.write(buffer, 0, len);
+        }
+        throttler.close();
+        final long millis = System.currentTimeMillis() - start;
+        // should take 4 sec give or take
+        assertTrue(millis > 3000);
+        assertTrue(millis < 6000);
+        baos.close();
+    }
+
+    @Test(timeout = 10000)
+    public void testDirectInterface() throws IOException, InterruptedException 
{
+        // throttle rate at 1 MB/sec
+        final LeakyBucketStreamThrottler throttler = new 
LeakyBucketStreamThrottler(1024 * 1024);
+
+        // create 3 threads, each sending ~2 MB
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final List<Thread> threads = new ArrayList<Thread>();
+        for (int i = 0; i < 3; i++) {
+            final Thread t = new WriterThread(i, throttler, baos);
+            threads.add(t);
+        }
+
+        final long start = System.currentTimeMillis();
+        for (final Thread t : threads) {
+            t.start();
+        }
+
+        for (final Thread t : threads) {
+            t.join();
+        }
+        final long elapsed = System.currentTimeMillis() - start;
+
+        throttler.close();
+
+        // To send 15 MB, it should have taken at least 5 seconds and no more 
than 7 seconds, to
+        // allow for busy-ness and the fact that we could write a tiny bit 
more than the limit.
+        assertTrue(elapsed > 5000);
+        assertTrue(elapsed < 7000);
+
+        // ensure bytes were copied out appropriately
+        assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength());
+        assertEquals((byte) 'A', 
baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]);
+    }
+
+    private static class WriterThread extends Thread {
+
+        private final int idx;
+        private final byte[] data = new byte[1024 * 1024 * 2 + 1];
+        private final LeakyBucketStreamThrottler throttler;
+        private final OutputStream out;
+
+        public WriterThread(final int idx, final LeakyBucketStreamThrottler 
throttler, final OutputStream out) {
+            this.idx = idx;
+            this.throttler = throttler;
+            this.out = out;
+            this.data[this.data.length - 1] = (byte) 'A';
+        }
+
+        @Override
+        public void run() {
+            long startMillis = System.currentTimeMillis();
+            long bytesWritten = 0L;
+            try {
+                throttler.copy(new ByteArrayInputStream(data), out);
+            } catch (IOException e) {
+                e.printStackTrace();
+                return;
+            }
+            long now = System.currentTimeMillis();
+            long millisElapsed = now - startMillis;
+            bytesWritten += data.length;
+            float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 
1000F;
+            System.out.println(idx + " : copied data at a rate of " + 
bytesPerSec + " bytes/sec");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
new file mode 100644
index 0000000..0838e96
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/TestNaiveSearchRingBuffer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+public class TestNaiveSearchRingBuffer {
+
+    @Test
+    public void testAddAndCompare() {
+        final byte[] pattern = new byte[]{
+            '\r', '0', 38, 48
+        };
+
+        final byte[] search = new byte[]{
+            '\r', '0', 38, 58, 58, 83, 78, '\r', '0', 38, 48, 83, 92, 78, 4, 38
+        };
+
+        final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern);
+        int counter = -1;
+        for (final byte b : search) {
+            counter++;
+            final boolean matched = circ.addAndCompare(b);
+            if (counter == 10) {
+                assertTrue(matched);
+            } else {
+                assertFalse(matched);
+            }
+        }
+    }
+
+    @Test
+    public void testGetOldestByte() {
+        final byte[] pattern = new byte[]{
+            '\r', '0', 38, 48
+        };
+
+        final byte[] search = new byte[]{
+            '\r', '0', 38, 58, 58, 83, 78, (byte) 223, (byte) 227, (byte) 250, 
'\r', '0', 38, 48, 83, 92, 78, 4, 38
+        };
+
+        final NaiveSearchRingBuffer circ = new NaiveSearchRingBuffer(pattern);
+        int counter = -1;
+        for (final byte b : search) {
+            counter++;
+            final boolean matched = circ.addAndCompare(b);
+            if (counter == 13) {
+                assertTrue(matched);
+            } else {
+                assertFalse(matched);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java
new file mode 100644
index 0000000..ec04efb
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestCompoundUpdateMonitor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.util.UUID;
+
+import org.junit.Test;
+
+public class TestCompoundUpdateMonitor {
+
+    @Test
+    public void test() throws IOException {
+        final UpdateMonitor lastModified = new LastModifiedMonitor();
+        final MD5SumMonitor md5 = new MD5SumMonitor();
+        final CompoundUpdateMonitor compound = new 
CompoundUpdateMonitor(lastModified, md5);
+
+        final File file = new File("target/" + UUID.randomUUID().toString());
+        if (file.exists()) {
+            assertTrue(file.delete());
+        }
+        assertTrue(file.createNewFile());
+
+        final Path path = file.toPath();
+
+        final Object curState = compound.getCurrentState(path);
+        final Object state2 = compound.getCurrentState(path);
+
+        assertEquals(curState, state2);
+        file.setLastModified(System.currentTimeMillis() + 1000L);
+        final Object state3 = compound.getCurrentState(path);
+        assertEquals(state2, state3);
+
+        final Object state4 = compound.getCurrentState(path);
+        assertEquals(state3, state4);
+
+        final long lastModifiedDate = file.lastModified();
+        try (final OutputStream out = new FileOutputStream(file)) {
+            out.write("Hello".getBytes("UTF-8"));
+        }
+
+        file.setLastModified(lastModifiedDate);
+
+        final Object state5 = compound.getCurrentState(path);
+        assertNotSame(state4, state5);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java
new file mode 100644
index 0000000..3440c16
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/file/monitor/TestSynchronousFileWatcher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.file.monitor;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import org.junit.Test;
+
+public class TestSynchronousFileWatcher {
+
+    @Test
+    public void testIt() throws UnsupportedEncodingException, IOException, 
InterruptedException {
+        final Path path = Paths.get("target/1.txt");
+        Files.copy(new ByteArrayInputStream("Hello, 
World!".getBytes("UTF-8")), path, StandardCopyOption.REPLACE_EXISTING);
+        final UpdateMonitor monitor = new MD5SumMonitor();
+
+        final SynchronousFileWatcher watcher = new 
SynchronousFileWatcher(path, monitor, 10L);
+        assertFalse(watcher.checkAndReset());
+        Thread.sleep(30L);
+        assertFalse(watcher.checkAndReset());
+
+        final FileOutputStream fos = new FileOutputStream(path.toFile());
+        try {
+            fos.write("Good-bye, World!".getBytes("UTF-8"));
+            fos.getFD().sync();
+        } finally {
+            fos.close();
+        }
+
+        assertTrue(watcher.checkAndReset());
+        assertFalse(watcher.checkAndReset());
+
+        Thread.sleep(30L);
+        assertFalse(watcher.checkAndReset());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
new file mode 100644
index 0000000..b01b495
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestRingBuffer.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.util.RingBuffer;
+import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
+import org.apache.nifi.util.RingBuffer.IterationDirection;
+
+import org.junit.Test;
+
+/**
+ *
+ */
+public class TestRingBuffer {
+
+    @Test
+    public void testGetNewestElement() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        for (int i = 0; i < 11; i++) {
+            ringBuffer.add(i);
+            assertEquals(i, ringBuffer.getNewestElement().intValue());
+        }
+    }
+
+    @Test
+    public void testAsList() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        final List<Integer> emptyList = ringBuffer.asList();
+        assertTrue(emptyList.isEmpty());
+
+        for (int i = 0; i < 3; i++) {
+            ringBuffer.add(i);
+        }
+
+        List<Integer> list = ringBuffer.asList();
+        assertEquals(3, list.size());
+        for (int i = 0; i < 3; i++) {
+            assertEquals(Integer.valueOf(i), list.get(i));
+        }
+
+        for (int i = 3; i < 10; i++) {
+            ringBuffer.add(i);
+        }
+
+        list = ringBuffer.asList();
+        assertEquals(10, list.size());
+        for (int i = 0; i < 10; i++) {
+            assertEquals(Integer.valueOf(i), list.get(i));
+        }
+    }
+
+    @Test
+    public void testIterateForwards() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        final int[] values = new int[]{3, 5, 20, 7};
+        for (final int v : values) {
+            ringBuffer.add(v);
+        }
+
+        final AtomicInteger countHolder = new AtomicInteger(0);
+        ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+            int counter = 0;
+
+            @Override
+            public boolean evaluate(final Integer value) {
+                final int expected = values[counter++];
+                countHolder.incrementAndGet();
+                assertEquals(expected, value.intValue());
+                return true;
+            }
+
+        }, IterationDirection.FORWARD);
+
+        assertEquals(4, countHolder.get());
+    }
+
+    @Test
+    public void testIterateForwardsAfterFull() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        for (int i = 0; i < 12; i++) {
+            ringBuffer.add(i);
+        }
+
+        final int[] values = new int[]{3, 5, 20, 7};
+        for (final int v : values) {
+            ringBuffer.add(v);
+        }
+
+        ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+            int counter = 0;
+
+            @Override
+            public boolean evaluate(final Integer value) {
+                if (counter < 6) {
+                    assertEquals(counter + 6, value.intValue());
+                } else {
+                    final int expected = values[counter - 6];
+                    assertEquals(expected, value.intValue());
+                }
+
+                counter++;
+                return true;
+            }
+
+        }, IterationDirection.FORWARD);
+    }
+
+    @Test
+    public void testIterateBackwards() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        final int[] values = new int[]{3, 5, 20, 7};
+        for (final int v : values) {
+            ringBuffer.add(v);
+        }
+
+        final AtomicInteger countHolder = new AtomicInteger(0);
+        ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+            int counter = 0;
+
+            @Override
+            public boolean evaluate(final Integer value) {
+                final int index = values.length - 1 - counter;
+                final int expected = values[index];
+                countHolder.incrementAndGet();
+
+                assertEquals(expected, value.intValue());
+                counter++;
+                return true;
+            }
+
+        }, IterationDirection.BACKWARD);
+
+        assertEquals(4, countHolder.get());
+    }
+
+    @Test
+    public void testIterateBackwardsAfterFull() {
+        final RingBuffer<Integer> ringBuffer = new RingBuffer<>(10);
+
+        for (int i = 0; i < 12; i++) {
+            ringBuffer.add(i);
+        }
+
+        final int[] values = new int[]{3, 5, 20, 7};
+        for (final int v : values) {
+            ringBuffer.add(v);
+        }
+
+        ringBuffer.forEach(new ForEachEvaluator<Integer>() {
+            int counter = 0;
+
+            @Override
+            public boolean evaluate(final Integer value) {
+                if (counter < values.length) {
+                    final int index = values.length - 1 - counter;
+                    final int expected = values[index];
+
+                    assertEquals(expected, value.intValue());
+                    counter++;
+                }
+
+                return true;
+            }
+
+        }, IterationDirection.BACKWARD);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java
new file mode 100644
index 0000000..39ca330
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+public class TestTimedBuffer {
+
+    @Test
+    public void testAgesOff() throws InterruptedException {
+        final LongEntityAccess access = new LongEntityAccess();
+        final TimedBuffer<TimestampedLong> buffer = new 
TimedBuffer<>(TimeUnit.SECONDS, 2, access);
+
+        buffer.add(new TimestampedLong(1000000L));
+        TimestampedLong aggregate = 
buffer.getAggregateValue(System.currentTimeMillis() - 30000L);
+        assertEquals(1000000L, aggregate.getValue().longValue());
+        Thread.sleep(1000L);
+        aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 
30000L);
+        assertEquals(1000000L, aggregate.getValue().longValue());
+        Thread.sleep(1500L);
+        aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 
30000L);
+        assertNull(aggregate);
+    }
+
+    @Test
+    public void testAggregation() throws InterruptedException {
+        final LongEntityAccess access = new LongEntityAccess();
+        final TimedBuffer<TimestampedLong> buffer = new 
TimedBuffer<>(TimeUnit.SECONDS, 2, access);
+
+        buffer.add(new TimestampedLong(1000000L));
+        buffer.add(new TimestampedLong(1000000L));
+        buffer.add(new TimestampedLong(25000L));
+
+        TimestampedLong aggregate = 
buffer.getAggregateValue(System.currentTimeMillis() - 30000L);
+        assertEquals(2025000L, aggregate.getValue().longValue());
+        Thread.sleep(1000L);
+        aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 
30000L);
+        assertEquals(2025000L, aggregate.getValue().longValue());
+        Thread.sleep(1500L);
+        aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 
30000L);
+        assertNull(aggregate);
+    }
+
+    private static class TimestampedLong {
+
+        private final Long value;
+        private final long timestamp = System.currentTimeMillis();
+
+        public TimestampedLong(final Long value) {
+            this.value = value;
+        }
+
+        public Long getValue() {
+            return value;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    private static class LongEntityAccess implements 
EntityAccess<TimestampedLong> {
+
+        @Override
+        public TimestampedLong aggregate(TimestampedLong oldValue, 
TimestampedLong toAdd) {
+            if (oldValue == null && toAdd == null) {
+                return new TimestampedLong(0L);
+            } else if (oldValue == null) {
+                return toAdd;
+            } else if (toAdd == null) {
+                return oldValue;
+            }
+
+            return new TimestampedLong(oldValue.getValue().longValue() + 
toAdd.getValue().longValue());
+        }
+
+        @Override
+        public TimestampedLong createNew() {
+            return new TimestampedLong(0L);
+        }
+
+        @Override
+        public long getTimestamp(TimestampedLong entity) {
+            return entity == null ? 0L : entity.getTimestamp();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-utils/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/resources/logback-test.xml 
b/nifi-commons/nifi-utils/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8651d47
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/test/resources/logback-test.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration scan="true" scanPeriod="30 seconds">
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%-4r [%t] %-5p %c - %m%n</pattern>
+        </encoder>
+    </appender>
+    
+    <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
+    <logger name="org.apache.nifi" level="DEBUG"/>
+    
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+    </root>
+    
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-web-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-web-utils/pom.xml 
b/nifi-commons/nifi-web-utils/pom.xml
new file mode 100644
index 0000000..8c51d7b
--- /dev/null
+++ b/nifi-commons/nifi-web-utils/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-web-utils</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>javax.servlet-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java
 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java
new file mode 100644
index 0000000..1eaf366
--- /dev/null
+++ 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.util;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import java.net.URI;
+import java.util.Map;
+import javax.ws.rs.core.MediaType;
+
+/**
+ *
+ */
+public class ClientUtils {
+
+    private final Client client;
+
+    public ClientUtils(Client client) {
+        this.client = client;
+    }
+
+    /**
+     * Gets the content at the specified URI.
+     *
+     * @param uri the URI to get the content of
+     * @return the client response resulting from getting the content of the 
URI
+     * @throws ClientHandlerException if issues occur handling the request
+     * @throws UniformInterfaceException if any interface violations occur
+     */
+    public ClientResponse get(final URI uri) throws ClientHandlerException, 
UniformInterfaceException {
+        return get(uri, null);
+    }
+
+    /**
+     * Gets the content at the specified URI using the given query parameters.
+     *
+     * @param uri the URI to get the content of
+     * @param queryParams the query parameters to use in the request
+     * @return the client response resulting from getting the content of the 
URI
+     * @throws ClientHandlerException if issues occur handling the request
+     * @throws UniformInterfaceException if any interface violations occur
+     */
+    public ClientResponse get(final URI uri, final Map<String, String> 
queryParams) throws ClientHandlerException, UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        if (queryParams != null) {
+            for (final Map.Entry<String, String> queryEntry : 
queryParams.entrySet()) {
+                webResource = webResource.queryParam(queryEntry.getKey(), 
queryEntry.getValue());
+            }
+        }
+
+        return 
webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    }
+
+    /**
+     * Performs a POST using the specified url and entity body.
+     *
+     * @param uri the URI to post to
+     * @param entity the item to post
+     * @return the client response of the request
+     */
+    public ClientResponse post(URI uri, Object entity) throws 
ClientHandlerException, UniformInterfaceException {
+        // get the resource
+        WebResource.Builder resourceBuilder = 
client.resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON);
+
+        // include the request entity
+        if (entity != null) {
+            resourceBuilder = resourceBuilder.entity(entity);
+        }
+
+        // perform the request
+        return resourceBuilder.post(ClientResponse.class);
+    }
+
+    /**
+     * Performs a POST using the specified url and form data.
+     *
+     * @param uri the uri to post to
+     * @param formData the data to post
+     * @return the client reponse of the post
+     */
+    public ClientResponse post(URI uri, Map<String, String> formData) throws 
ClientHandlerException, UniformInterfaceException {
+        // convert the form data
+        MultivaluedMapImpl entity = new MultivaluedMapImpl();
+        for (String key : formData.keySet()) {
+            entity.add(key, formData.get(key));
+        }
+
+        // get the resource
+        WebResource.Builder resourceBuilder = 
client.resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED);
+
+        // add the form data if necessary
+        if (!entity.isEmpty()) {
+            resourceBuilder = resourceBuilder.entity(entity);
+        }
+
+        // perform the request
+        return resourceBuilder.post(ClientResponse.class);
+    }
+
+    /**
+     * Performs a HEAD request to the specified URI.
+     *
+     * @param uri the uri to request the head of
+     * @return the client response of the request
+     * @throws ClientHandlerException for issues handling the request
+     * @throws UniformInterfaceException for issues with the request
+     */
+    public ClientResponse head(final URI uri) throws ClientHandlerException, 
UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        return webResource.head();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java
 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java
new file mode 100644
index 0000000..4e7f5b6
--- /dev/null
+++ 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.util;
+
+import javax.ws.rs.ext.ContextResolver;
+import javax.ws.rs.ext.Provider;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+@Provider
+public class ObjectMapperResolver implements ContextResolver<ObjectMapper> {
+
+    private final ObjectMapper mapper;
+
+    public ObjectMapperResolver() throws Exception {
+        mapper = new ObjectMapper();
+
+        final AnnotationIntrospector jaxbIntrospector = new 
JaxbAnnotationIntrospector();
+        final SerializationConfig serializationConfig = 
mapper.getSerializationConfig();
+        final DeserializationConfig deserializationConfig = 
mapper.getDeserializationConfig();
+
+        
mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector));
+        
mapper.setDeserializationConfig(deserializationConfig.withAnnotationIntrospector(jaxbIntrospector));
+    }
+
+    @Override
+    public ObjectMapper getContext(Class<?> objectType) {
+        return mapper;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
new file mode 100644
index 0000000..e27f91c
--- /dev/null
+++ 
b/nifi-commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.nifi.security.util.CertificateUtils;
+
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
+
+/**
+ * Common utilities related to web development.
+ *
+ */
+public final class WebUtils {
+
+    private static Logger logger = LoggerFactory.getLogger(WebUtils.class);
+
+    final static ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private WebUtils() {
+    }
+
+    /**
+     * Creates a client for non-secure requests. The client will be created
+     * using the given configuration. Additionally, the client will be
+     * automatically configured for JSON serialization/deserialization.
+     *
+     * @param config client configuration
+     *
+     * @return a Client instance
+     */
+    public static Client createClient(final ClientConfig config) {
+        return createClientHelper(config, null);
+    }
+
+    /**
+     * Creates a client for secure requests. The client will be created using
+     * the given configuration and security context. Additionally, the client
+     * will be automatically configured for JSON serialization/deserialization.
+     *
+     * @param config client configuration
+     * @param ctx security context
+     *
+     * @return a Client instance
+     */
+    public static Client createClient(final ClientConfig config, final 
SSLContext ctx) {
+        return createClientHelper(config, ctx);
+    }
+
+    /**
+     * A helper method for creating clients. The client will be created using
+     * the given configuration and security context. Additionally, the client
+     * will be automatically configured for JSON serialization/deserialization.
+     *
+     * @param config client configuration
+     * @param ctx security context, which may be null for non-secure client
+     * creation
+     *
+     * @return a Client instance
+     */
+    private static Client createClientHelper(final ClientConfig config, final 
SSLContext ctx) {
+
+        final ClientConfig finalConfig = (config == null) ? new 
DefaultClientConfig() : config;
+
+        if (ctx != null && StringUtils.isBlank((String) 
finalConfig.getProperty(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES))) {
+
+            // custom hostname verifier that checks subject alternative names 
against the hostname of the URI
+            final HostnameVerifier hostnameVerifier = new HostnameVerifier() {
+                @Override
+                public boolean verify(final String hostname, final SSLSession 
ssls) {
+
+                    try {
+                        for (final Certificate peerCertificate : 
ssls.getPeerCertificates()) {
+                            if (peerCertificate instanceof X509Certificate) {
+                                final X509Certificate x509Cert = 
(X509Certificate) peerCertificate;
+                                final List<String> subjectAltNames = 
CertificateUtils.getSubjectAlternativeNames(x509Cert);
+                                if 
(subjectAltNames.contains(hostname.toLowerCase())) {
+                                    return true;
+                                }
+                            }
+                        }
+                    } catch (final SSLPeerUnverifiedException | 
CertificateParsingException ex) {
+                        logger.warn("Hostname Verification encountered 
exception verifying hostname due to: " + ex, ex);
+                    }
+
+                    return false;
+                }
+            };
+
+            
finalConfig.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, new 
HTTPSProperties(hostnameVerifier, ctx));
+        }
+
+        finalConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, 
Boolean.TRUE);
+        finalConfig.getClasses().add(ObjectMapperResolver.class);
+
+        // web client for restful request
+        return Client.create(finalConfig);
+
+    }
+
+    /**
+     * Serializes the given object to hexadecimal. Serialization uses Java's
+     * native serialization mechanism, the ObjectOutputStream.
+     *
+     * @param obj an object
+     * @return the serialized object as hex
+     */
+    public static String serializeObjectToHex(final Serializable obj) {
+
+        final ByteArrayOutputStream serializedObj = new 
ByteArrayOutputStream();
+
+        // IOException can never be thrown because we are serializing to an in 
memory byte array
+        try {
+            final ObjectOutputStream oos = new 
ObjectOutputStream(serializedObj);
+            oos.writeObject(obj);
+            oos.close();
+        } catch (final IOException ioe) {
+            throw new RuntimeException(ioe);
+        }
+
+        logger.debug(String.format("Serialized object '%s' size: %d", obj, 
serializedObj.size()));
+
+        // hex encode the binary
+        return new String(Hex.encodeHex(serializedObj.toByteArray(), /* 
tolowercase */ true));
+    }
+
+    /**
+     * Deserializes a Java serialized, hex-encoded string into a Java object.
+     * This method is the inverse of the serializeObjectToHex method in this
+     * class.
+     *
+     * @param hexEncodedObject a string
+     * @return the object
+     * @throws ClassNotFoundException if the class could not be found
+     */
+    public static Serializable deserializeHexToObject(final String 
hexEncodedObject) throws ClassNotFoundException {
+
+        // decode the hex encoded object
+        byte[] serializedObj;
+        try {
+            serializedObj = Hex.decodeHex(hexEncodedObject.toCharArray());
+        } catch (final DecoderException de) {
+            throw new IllegalArgumentException(de);
+        }
+
+        // IOException can never be thrown because we are deserializing from 
an in memory byte array
+        try {
+            // deserialize bytes into object
+            ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(serializedObj));
+            return (Serializable) ois.readObject();
+        } catch (final IOException ioe) {
+            throw new RuntimeException(ioe);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-write-ahead-log/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/.gitignore 
b/nifi-commons/nifi-write-ahead-log/.gitignore
new file mode 100755
index 0000000..19f2e00
--- /dev/null
+++ b/nifi-commons/nifi-write-ahead-log/.gitignore
@@ -0,0 +1,2 @@
+/target
+/target

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-write-ahead-log/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-write-ahead-log/pom.xml 
b/nifi-commons/nifi-write-ahead-log/pom.xml
new file mode 100644
index 0000000..7e514a7
--- /dev/null
+++ b/nifi-commons/nifi-write-ahead-log/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-write-ahead-log</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+    </dependencies>
+</project>

Reply via email to