Repository: nifi
Updated Branches:
  refs/heads/NIFI-259 [created] 57dadb728


http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
index 5eab704..d3eb515 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java
@@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -29,22 +28,16 @@ import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.UnrecoverableKeyException;
 import java.security.cert.CertificateException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
+import java.util.Map;
 import java.util.Set;
-import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
 import javax.net.ssl.SSLContext;
@@ -76,11 +69,12 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnRemoved;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -202,30 +196,14 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
     public static final String LAST_MODIFIED_DATE_PATTERN_RFC1123 = "EEE, dd 
MMM yyyy HH:mm:ss zzz";
 
     // package access to enable unit testing
-    static final String UNINITIALIZED_LAST_MODIFIED_VALUE;
-
-    private static final String HTTP_CACHE_FILE_PREFIX = "conf/.httpCache-";
-
     static final String ETAG = "ETag";
-
     static final String LAST_MODIFIED = "LastModified";
 
-    static {
-        final SimpleDateFormat sdf = new 
SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US);
-        sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
-        UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L));
-    }
-    final AtomicReference<String> lastModifiedRef = new 
AtomicReference<>(UNINITIALIZED_LAST_MODIFIED_VALUE);
-    final AtomicReference<String> entityTagRef = new AtomicReference<>("");
-    // end
 
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> properties;
 
-    private volatile long timeToPersist = 0;
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ReadLock readLock = lock.readLock();
-    private final WriteLock writeLock = lock.writeLock();
+    private final AtomicBoolean clearState = new AtomicBoolean(false);
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -247,16 +225,6 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
         properties.add(PROXY_HOST);
         properties.add(PROXY_PORT);
         this.properties = Collections.unmodifiableList(properties);
-
-        // load etag and lastModified from file
-        final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + 
getIdentifier());
-        try (FileInputStream fis = new FileInputStream(httpCache)) {
-            final Properties props = new Properties();
-            props.load(fis);
-            entityTagRef.set(props.getProperty(ETAG));
-            lastModifiedRef.set(props.getProperty(LAST_MODIFIED));
-        } catch (final IOException swallow) {
-        }
     }
 
     @Override
@@ -271,28 +239,13 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
-        entityTagRef.set("");
-        lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE);
-    }
-
-    @OnStopped
-    public void onStopped() {
-        final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + 
getIdentifier());
-        try (FileOutputStream fos = new FileOutputStream(httpCache)) {
-            final Properties props = new Properties();
-            props.setProperty(ETAG, entityTagRef.get());
-            props.setProperty(LAST_MODIFIED, lastModifiedRef.get());
-            props.store(fos, "GetHTTP file modification values");
-        } catch (final IOException swallow) {
-        }
-
+        clearState.set(true);
     }
 
-    @OnRemoved
-    public void onRemoved() {
-        final File httpCache = new File(HTTP_CACHE_FILE_PREFIX + 
getIdentifier());
-        if (httpCache.exists()) {
-            httpCache.delete();
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        if (clearState.getAndSet(false)) {
+            context.getStateManager().clear(Scope.LOCAL);
         }
     }
 
@@ -444,8 +397,20 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
             final HttpGet get = new HttpGet(url);
             get.setConfig(requestConfigBuilder.build());
 
-            get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModifiedRef.get());
-            get.addHeader(HEADER_IF_NONE_MATCH, entityTagRef.get());
+            try {
+                final StateMap stateMap = 
context.getStateManager().getState(Scope.LOCAL);
+                final String lastModified = stateMap.get(LAST_MODIFIED);
+                if (lastModified != null) {
+                    get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModified);
+                }
+
+                final String etag = stateMap.get(ETAG);
+                if (etag != null) {
+                    get.addHeader(HEADER_IF_NONE_MATCH, etag);
+                }
+            } catch (final IOException ioe) {
+                throw new ProcessException(ioe);
+            }
 
             final String accept = 
context.getProperty(ACCEPT_CONTENT_TYPE).getValue();
             if (accept != null) {
@@ -492,41 +457,20 @@ public class GetHTTP extends 
AbstractSessionFactoryProcessor {
                 session.transfer(flowFile, REL_SUCCESS);
                 logger.info("Successfully received {} from {} at a rate of {}; 
transferred to success", new Object[]{flowFile, url, dataRate});
                 session.commit();
+
+                final Map<String, String> updatedState = new HashMap<>(2);
                 final Header lastModified = 
response.getFirstHeader(HEADER_LAST_MODIFIED);
                 if (lastModified != null) {
-                    lastModifiedRef.set(lastModified.getValue());
+                    updatedState.put(LAST_MODIFIED, lastModified.getValue());
                 }
 
                 final Header etag = response.getFirstHeader(HEADER_ETAG);
                 if (etag != null) {
-                    entityTagRef.set(etag.getValue());
+                    updatedState.put(ETAG, etag.getValue());
                 }
-                if ((etag != null || lastModified != null) && 
readLock.tryLock()) {
-                    try {
-                        if (timeToPersist < System.currentTimeMillis()) {
-                            readLock.unlock();
-                            writeLock.lock();
-                            try {
-                                if (timeToPersist < 
System.currentTimeMillis()) {
-                                    timeToPersist = System.currentTimeMillis() 
+ PERSISTENCE_INTERVAL_MSEC;
-                                    final File httpCache = new 
File(HTTP_CACHE_FILE_PREFIX + getIdentifier());
-                                    try (FileOutputStream fos = new 
FileOutputStream(httpCache)) {
-                                        final Properties props = new 
Properties();
-                                        props.setProperty(ETAG, 
entityTagRef.get());
-                                        props.setProperty(LAST_MODIFIED, 
lastModifiedRef.get());
-                                        props.store(fos, "GetHTTP file 
modification values");
-                                    } catch (final IOException e) {
-                                        getLogger().error("Failed to persist 
ETag and LastMod due to " + e, e);
-                                    }
-                                }
-                            } finally {
-                                readLock.lock();
-                                writeLock.unlock();
-                            }
-                        }
-                    } finally {
-                        readLock.unlock();
-                    }
+
+                if (!updatedState.isEmpty()) {
+                    context.getStateManager().setState(updatedState, 
Scope.LOCAL);
                 }
             } catch (final IOException e) {
                 context.yield();

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
index ba84939..37316b6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestAbstractListProcessor.java
@@ -26,8 +26,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@@ -35,6 +37,7 @@ import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.standard.util.ListableEntity;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
@@ -96,7 +99,7 @@ public class TestAbstractListProcessor {
     }
 
     @Test
-    public void testStateStoredInDistributedService() throws 
InitializationException {
+    public void testStateStoredInClusterStateManagement() throws 
InitializationException {
         final ConcreteListProcessor proc = new ConcreteListProcessor();
         final TestRunner runner = TestRunners.newTestRunner(proc);
         final DistributedCache cache = new DistributedCache();
@@ -109,7 +112,32 @@ public class TestAbstractListProcessor {
         proc.addEntity("name", "id", 1492L);
         runner.run();
 
-        assertEquals(1, cache.stored.size());
+        final Map<String, String> expectedState = new HashMap<>();
+        expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
+        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", 
"id");
+        runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
+    }
+
+    @Test
+    public void testStateMigrated() throws InitializationException {
+        final ConcreteListProcessor proc = new ConcreteListProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, 
"cache");
+
+        final String serviceState = 
"{\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
+        final String cacheKey = runner.getProcessor().getIdentifier() + 
".lastListingTime./path";
+        cache.stored.put(cacheKey, serviceState);
+
+        runner.run();
+
+        final MockStateManager stateManager = runner.getStateManager();
+        final Map<String, String> expectedState = new HashMap<>();
+        expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
+        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", 
"id");
+        stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
     }
 
     @Test
@@ -174,7 +202,7 @@ public class TestAbstractListProcessor {
 
         @Override
         protected File getPersistenceFile() {
-            return new File("target/ListProcessor-local-state.json");
+            return new File("target/ListProcessor-local-state-" + 
UUID.randomUUID().toString() + ".json");
         }
 
         public void addEntity(final String name, final String identifier, 
final long timestamp) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
index 12a5cd4..e09d6f0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java
@@ -16,16 +16,12 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.lang3.SerializationException;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -34,25 +30,21 @@ import 
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockControllerServiceInitializationContext;
 import org.apache.nifi.util.MockProcessorLog;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TestDetectDuplicate {
 
-    private static Logger LOGGER;
-
     static {
         System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
         System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
         System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug");
         
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.DetectDuplicate",
 "debug");
         
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestDetectDuplicate",
 "debug");
-        LOGGER = LoggerFactory.getLogger(TestDetectDuplicate.class);
     }
 
     @Test
@@ -114,7 +106,7 @@ public class TestDetectDuplicate {
 
         final DistributedMapCacheClientImpl client = new 
DistributedMapCacheClientImpl();
         final ComponentLog logger = new MockProcessorLog("client", client);
-        final MockControllerServiceInitializationContext clientInitContext = 
new MockControllerServiceInitializationContext(client, "client", logger);
+        final MockControllerServiceInitializationContext clientInitContext = 
new MockControllerServiceInitializationContext(client, "client", logger, new 
MockStateManager());
         client.initialize(clientInitContext);
 
         return client;
@@ -154,6 +146,7 @@ public class TestDetectDuplicate {
         }
 
         @Override
+        @SuppressWarnings("unchecked")
         public <K, V> V getAndPutIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer,
                 final Deserializer<V> valueDeserializer) throws IOException {
             if (exists) {
@@ -183,33 +176,4 @@ public class TestDetectDuplicate {
         public <K, V> void put(final K key, final V value, final Serializer<K> 
keySerializer, final Serializer<V> valueSerializer) throws IOException {
         }
     }
-
-    private static class StringSerializer implements Serializer<String> {
-
-        @Override
-        public void serialize(final String value, final OutputStream output) 
throws SerializationException, IOException {
-            output.write(value.getBytes(StandardCharsets.UTF_8));
-        }
-    }
-
-    private static void deleteRecursively(final File dataFile) throws 
IOException {
-        if (dataFile == null || !dataFile.exists()) {
-            return;
-        }
-
-        final File[] children = dataFile.listFiles();
-        for (final File child : children) {
-            if (child.isDirectory()) {
-                deleteRecursively(child);
-            } else {
-                for (int i = 0; i < 100 && child.exists(); i++) {
-                    child.delete();
-                }
-
-                if (child.exists()) {
-                    throw new IOException("Could not delete " + 
dataFile.getAbsolutePath());
-                }
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
index 5aec796..9c1d083 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetHTTP.java
@@ -16,24 +16,18 @@
  */
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.StandardSSLContextService;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.MockProcessContext;
-import org.apache.nifi.util.MockProcessorInitializationContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.eclipse.jetty.servlet.ServletHandler;
@@ -97,14 +91,14 @@ public class TestGetHTTP {
             controller.setProperty(GetHTTP.FILENAME, "testFile");
             controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, 
"application/json");
 
-            GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor();
-            assertEquals("", getHTTPProcessor.entityTagRef.get());
-            assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", 
getHTTPProcessor.lastModifiedRef.get());
+            controller.getStateManager().assertStateNotSet(GetHTTP.ETAG, 
Scope.LOCAL);
+            
controller.getStateManager().assertStateNotSet(GetHTTP.LAST_MODIFIED, 
Scope.LOCAL);
             controller.run(2);
 
             // verify the lastModified and entityTag are updated
-            assertFalse("".equals(getHTTPProcessor.entityTagRef.get()));
-            assertFalse("Thu, 01 Jan 1970 00:00:00 
GMT".equals(getHTTPProcessor.lastModifiedRef.get()));
+            controller.getStateManager().assertStateNotEquals(GetHTTP.ETAG, 
"", Scope.LOCAL);
+            
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, "Thu, 
01 Jan 1970 00:00:00 GMT", Scope.LOCAL);
+
             // ran twice, but got one...which is good
             controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
 
@@ -141,18 +135,18 @@ public class TestGetHTTP {
             controller.run(2);
             // ran twice, got 1...but should have new cached etag
             controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
-            assertEquals("1", getHTTPProcessor.entityTagRef.get());
+            controller.getStateManager().assertStateEquals(GetHTTP.ETAG, "1", 
Scope.LOCAL);
             controller.clearTransferState();
 
             // turn off checking for Etag, turn on checking for lastModified, 
but change value
             RESTServiceContentModified.IGNORE_LAST_MODIFIED = false;
             RESTServiceContentModified.IGNORE_ETAG = true;
             RESTServiceContentModified.modificationDate = 
System.currentTimeMillis() / 1000 * 1000 + 5000;
-            String lastMod = getHTTPProcessor.lastModifiedRef.get();
+            String lastMod = 
controller.getStateManager().getState(Scope.LOCAL).get(GetHTTP.LAST_MODIFIED);
             controller.run(2);
             // ran twice, got 1...but should have new cached etag
             controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
-            
assertFalse(lastMod.equals(getHTTPProcessor.lastModifiedRef.get()));
+            
controller.getStateManager().assertStateNotEquals(GetHTTP.LAST_MODIFIED, 
lastMod, Scope.LOCAL);
             controller.clearTransferState();
 
             // shutdown web service
@@ -161,90 +155,6 @@ public class TestGetHTTP {
         }
     }
 
-    @Test
-    public void testPersistEtagLastMod() throws Exception {
-        // delete the config file
-        File confDir = new File("conf");
-        File[] files = confDir.listFiles();
-        for (File file : files) {
-            assertTrue("Failed to delete " + file.getName(), file.delete());
-        }
-
-        // set up web service
-        ServletHandler handler = new ServletHandler();
-        handler.addServletWithMapping(RESTServiceContentModified.class, "/*");
-
-        // create the service
-        TestServer server = new TestServer();
-        server.addHandler(handler);
-
-        try {
-            server.startServer();
-
-            // get the server url
-            String destination = server.getUrl();
-
-            // set up NiFi mock controller
-            controller = TestRunners.newTestRunner(GetHTTP.class);
-            controller.setProperty(GetHTTP.CONNECTION_TIMEOUT, "5 secs");
-            controller.setProperty(GetHTTP.FILENAME, "testFile");
-            controller.setProperty(GetHTTP.URL, destination);
-            controller.setProperty(GetHTTP.ACCEPT_CONTENT_TYPE, 
"application/json");
-
-            GetHTTP getHTTPProcessor = (GetHTTP) controller.getProcessor();
-
-            assertEquals("", getHTTPProcessor.entityTagRef.get());
-            assertEquals("Thu, 01 Jan 1970 00:00:00 GMT", 
getHTTPProcessor.lastModifiedRef.get());
-            controller.run(2);
-
-            // verify the lastModified and entityTag are updated
-            String etag = getHTTPProcessor.entityTagRef.get();
-            assertFalse("".equals(etag));
-            String lastMod = getHTTPProcessor.lastModifiedRef.get();
-            assertFalse("Thu, 01 Jan 1970 00:00:00 GMT".equals(lastMod));
-            // ran twice, but got one...which is good
-            controller.assertTransferCount(GetHTTP.REL_SUCCESS, 1);
-            controller.clearTransferState();
-
-            files = confDir.listFiles();
-            assertEquals(1, files.length);
-            File file = files[0];
-            assertTrue(file.exists());
-            Properties props = new Properties();
-            FileInputStream fis = new FileInputStream(file);
-            props.load(fis);
-            fis.close();
-            assertEquals(etag, props.getProperty(GetHTTP.ETAG));
-            assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED));
-
-            ProcessorInitializationContext pic = new 
MockProcessorInitializationContext(controller.getProcessor(), 
(MockProcessContext) controller.getProcessContext());
-            // init causes read from file
-            getHTTPProcessor.init(pic);
-            assertEquals(etag, getHTTPProcessor.entityTagRef.get());
-            assertEquals(lastMod, getHTTPProcessor.lastModifiedRef.get());
-            controller.run(2);
-            // ran twice, got none...which is good
-            controller.assertTransferCount(GetHTTP.REL_SUCCESS, 0);
-            controller.clearTransferState();
-            files = confDir.listFiles();
-            assertEquals(1, files.length);
-            file = files[0];
-            assertTrue(file.exists());
-            props = new Properties();
-            fis = new FileInputStream(file);
-            props.load(fis);
-            fis.close();
-            assertEquals(etag, props.getProperty(GetHTTP.ETAG));
-            assertEquals(lastMod, props.getProperty(GetHTTP.LAST_MODIFIED));
-
-            getHTTPProcessor.onRemoved();
-            assertFalse(file.exists());
-
-            // shutdown web service
-        } finally {
-            server.shutdownServer();
-        }
-    }
 
     @Test
     public final void testUserAgent() throws Exception {

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
index 2eac3f2..3b70dc4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestRouteOnAttribute.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
@@ -38,7 +39,7 @@ public class TestRouteOnAttribute {
     @Test
     public void testInvalidOnMisconfiguredProperty() {
         final RouteOnAttribute proc = new RouteOnAttribute();
-        final MockProcessContext ctx = new MockProcessContext(proc);
+        final MockProcessContext ctx = new MockProcessContext(proc, new 
MockStateManager());
         final ValidationResult validationResult = ctx.setProperty("RouteA", 
"${a:equals('b')"); // Missing closing brace
         assertFalse(validationResult.isValid());
     }
@@ -46,7 +47,7 @@ public class TestRouteOnAttribute {
     @Test
     public void testInvalidOnNonBooleanProperty() {
         final RouteOnAttribute proc = new RouteOnAttribute();
-        final MockProcessContext ctx = new MockProcessContext(proc);
+        final MockProcessContext ctx = new MockProcessContext(proc, new 
MockStateManager());
         final ValidationResult validationResult = ctx.setProperty("RouteA", 
"${a:length()"); // Should be boolean
         assertFalse(validationResult.isValid());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2af004d..954733f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,8 @@ language governing permissions and limitations under the 
License. -->
     <repositories>
         <repository>
             <id>central</id>
-            <!-- This should be at top, it makes maven try the central repo 
first and then others and hence faster dep resolution -->
+            <!-- This should be at top, it makes maven try the central repo 
+                first and then others and hence faster dep resolution -->
             <name>Maven Repository</name>
             <url>https://repo1.maven.org/maven2</url>
             <releases>
@@ -578,7 +579,7 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.activemq</groupId>
-                               <artifactId>activemq-broker</artifactId>
+                <artifactId>activemq-broker</artifactId>
                 <version>5.12.1</version>
                 <scope>tests</scope>
             </dependency>
@@ -749,6 +750,27 @@ language governing permissions and limitations under the 
License. -->
                 <version>1.3.1</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.zookeeper</groupId>
+                <artifactId>zookeeper</artifactId>
+                <version>3.4.7</version>
+            </dependency>
+            
+            <!-- Test Dependencies for testing interactions with ZooKeeper -->
+            <dependency>
+                <groupId>org.apache.curator</groupId>
+                <artifactId>curator-test</artifactId>
+                <version>3.0.0</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.testng</groupId>
+                <artifactId>testng</artifactId>
+                <version>6.8.8</version>
+                <scope>test</scope>
+            </dependency>
+            
+            <!-- NiFi modules -->
+            <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-api</artifactId>
                 <version>0.4.2-SNAPSHOT</version>
@@ -1283,71 +1305,100 @@ language governing permissions and limitations under 
the License. -->
                             <!-- Checks for whitespace -->
                             <!-- See 
http://checkstyle.sf.net/config_whitespace.html -->
                             <module name="FileTabCharacter">
-                                <property name="eachLine" value="true" />
+                                <property name="eachLine"
+                                    value="true" />
                             </module>
                             <module name="TreeWalker">
                                 <module name="RegexpSinglelineJava">
-                                    <property name="format" value="\s+$" />
-                                    <property name="message" value="Line has 
trailing whitespace." />
+                                    <property name="format"
+                                        value="\s+$" />
+                                    <property name="message"
+                                        value="Line has trailing whitespace." 
/>
                                 </module>
                                 <module name="RegexpSinglelineJava">
-                                    <property name="format" 
value="[@]see\s+[{][@]link" />
-                                    <property name="message" value="Javadoc 
@see does not need @link: pick one or the other." />
+                                    <property name="format"
+                                        value="[@]see\s+[{][@]link" />
+                                    <property name="message"
+                                        value="Javadoc @see does not need 
@link: pick one or the other." />
                                 </module>
                                 <module name="OuterTypeFilename" />
                                 <module name="LineLength">
                                     <!-- needs extra, because Eclipse 
formatter ignores the ending left 
                                     brace -->
                                     <property name="max" value="200" />
-                                    <property name="ignorePattern" 
value="^package.*|^import.*|a href|href|http://|https://|ftp://"; />
+                                    <property name="ignorePattern"
+                                        value="^package.*|^import.*|a 
href|href|http://|https://|ftp://"; />
                                 </module>
                                 <module name="AvoidStarImport" />
                                 <module name="UnusedImports">
-                                    <property name="processJavadoc" 
value="true" />
+                                    <property name="processJavadoc"
+                                        value="true" />
                                 </module>
                                 <module name="NoLineWrap" />
                                 <module name="LeftCurly">
-                                    <property name="maxLineLength" value="160" 
/>
+                                    <property name="maxLineLength"
+                                        value="160" />
                                 </module>
                                 <module name="RightCurly" />
                                 <module name="RightCurly">
-                                    <property name="option" value="alone" />
-                                    <property name="tokens" value="CLASS_DEF, 
METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, 
INSTANCE_INIT" />
+                                    <property name="option"
+                                        value="alone" />
+                                    <property name="tokens"
+                                        value="CLASS_DEF, METHOD_DEF, 
CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" />
                                 </module>
                                 <module name="SeparatorWrap">
-                                    <property name="tokens" value="DOT" />
-                                    <property name="option" value="nl" />
+                                    <property name="tokens"
+                                        value="DOT" />
+                                    <property name="option"
+                                        value="nl" />
                                 </module>
                                 <module name="SeparatorWrap">
-                                    <property name="tokens" value="COMMA" />
-                                    <property name="option" value="EOL" />
+                                    <property name="tokens"
+                                        value="COMMA" />
+                                    <property name="option"
+                                        value="EOL" />
                                 </module>
                                 <module name="PackageName">
-                                    <property name="format" 
value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" />
+                                    <property name="format"
+                                        value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" 
/>
                                 </module>
                                 <module name="MethodTypeParameterName">
-                                    <property name="format" 
value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" />
+                                    <property name="format"
+                                        
value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" />
                                 </module>
                                 <module name="MethodParamPad" />
                                 <module name="OperatorWrap">
-                                    <property name="option" value="NL" />
-                                    <property name="tokens" value="BAND, BOR, 
BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, 
MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " />
+                                    <property name="option"
+                                        value="NL" />
+                                    <property name="tokens"
+                                        value="BAND, BOR, BSR, BXOR, DIV, 
EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, 
QUESTION, SL, SR, STAR " />
                                 </module>
                                 <module name="AnnotationLocation">
-                                    <property name="tokens" value="CLASS_DEF, 
INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" />
+                                    <property name="tokens"
+                                        value="CLASS_DEF, INTERFACE_DEF, 
ENUM_DEF, METHOD_DEF, CTOR_DEF" />
                                 </module>
                                 <module name="AnnotationLocation">
-                                    <property name="tokens" 
value="VARIABLE_DEF" />
-                                    <property 
name="allowSamelineMultipleAnnotations" value="true" />
+                                    <property name="tokens"
+                                        value="VARIABLE_DEF" />
+                                    <property
+                                        name="allowSamelineMultipleAnnotations"
+                                        value="true" />
                                 </module>
                                 <module name="NonEmptyAtclauseDescription" />
                                 <module name="JavadocMethod">
-                                    <property name="allowMissingJavadoc" 
value="true" />
-                                    <property name="allowMissingParamTags" 
value="true" />
-                                    <property name="allowMissingThrowsTags" 
value="true" />
-                                    <property name="allowMissingReturnTag" 
value="true" />
-                                    <property name="allowedAnnotations" 
value="Override,Test,BeforeClass,AfterClass,Before,After" />
-                                    <property 
name="allowThrowsTagsForSubclasses" value="true" />
+                                    <property name="allowMissingJavadoc"
+                                        value="true" />
+                                    <property name="allowMissingParamTags"
+                                        value="true" />
+                                    <property name="allowMissingThrowsTags"
+                                        value="true" />
+                                    <property name="allowMissingReturnTag"
+                                        value="true" />
+                                    <property name="allowedAnnotations"
+                                        
value="Override,Test,BeforeClass,AfterClass,Before,After" />
+                                    <property
+                                        name="allowThrowsTagsForSubclasses"
+                                        value="true" />
                                 </module>
                                 <module name="SingleLineJavadoc" />
                             </module>
@@ -1424,12 +1475,12 @@ language governing permissions and limitations under 
the License. -->
             </build>
         </profile>
         <profile>
-            <!-- This profile will disable DocLint which performs strict 
JavaDoc
-            processing which was introduced in JDK 8. These are technically 
errors
-            in the JavaDoc which we need to eventually address. However, if a 
release
-            is performed using JDK 8, the JavaDoc generation would fail. By 
activating
-            this profile when running on JDK 8 we can ensure the JavaDocs 
continue
-            to generate successfully -->
+            <!-- This profile will disable DocLint which performs strict 
+                JavaDoc processing which was introduced in JDK 8. These are 
technically errors 
+                in the JavaDoc which we need to eventually address. However, 
if a release 
+                is performed using JDK 8, the JavaDoc generation would fail. 
By activating 
+                this profile when running on JDK 8 we can ensure the JavaDocs 
continue to 
+                generate successfully -->
             <id>disable-doclint</id>
             <activation>
                 <jdk>1.8</jdk>

Reply via email to