NIFI-259: Bug fixes

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b07e13a1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b07e13a1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b07e13a1

Branch: refs/heads/master
Commit: b07e13a1d82af20919349387841833dc2daa16c7
Parents: 8f9c0b9
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Jan 21 13:44:44 2016 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Thu Jan 21 13:44:44 2016 -0500

----------------------------------------------------------------------
 .../cluster/manager/impl/WebClusterManager.java | 72 +++++++++++---------
 .../zookeeper/ZooKeeperStateProvider.java       |  9 ++-
 .../zookeeper/TestZooKeeperStateProvider.java   |  7 +-
 .../web/dao/impl/StandardComponentStateDAO.java |  4 +-
 4 files changed, 53 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b07e13a1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 1ebf010..670736c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -421,40 +421,6 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         }
         componentStatusSnapshotMillis = snapshotMillis;
 
-        
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new 
Runnable() {
-            @Override
-            public void run() {
-                readLock.lock();
-                try {
-                    for (final Node node : nodes) {
-                        if (Status.CONNECTED.equals(node.getStatus())) {
-                            ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
-                            if (statusRepository == null) {
-                                statusRepository = 
createComponentStatusRepository();
-                                
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
-                            }
-
-                            // ensure this node has a payload
-                            if (node.getHeartbeat() != null && 
node.getHeartbeatPayload() != null) {
-                                // if nothing has been captured or the current 
heartbeat is newer, capture it - comparing the heatbeat created timestamp
-                                // is safe since its marked as XmlTransient so 
we're assured that its based off the same clock that created the last capture 
date
-                                if (statusRepository.getLastCaptureDate() == 
null || node.getHeartbeat().getCreatedTimestamp() > 
statusRepository.getLastCaptureDate().getTime()) {
-                                    
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
-                                }
-                            }
-                        }
-                    }
-                } catch (final Throwable t) {
-                    logger.warn("Unable to capture component metrics from Node 
heartbeats: " + t);
-                    if (logger.isDebugEnabled()) {
-                        logger.warn("", t);
-                    }
-                } finally {
-                    readLock.unlock("capture component metrics from node 
heartbeats");
-                }
-            }
-        }, componentStatusSnapshotMillis, componentStatusSnapshotMillis, 
TimeUnit.MILLISECONDS);
-
         remoteInputPort = properties.getRemoteInputPort();
         if (remoteInputPort == null) {
             remoteSiteListener = null;
@@ -496,6 +462,7 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
         processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, 
new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
         processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 
10);
         processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
+        processScheduler.scheduleFrameworkTask(new CaptureComponentMetrics(), 
"Capture Component Metrics", componentStatusSnapshotMillis, 
componentStatusSnapshotMillis, TimeUnit.MILLISECONDS);
 
         controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository, 
stateManagerProvider);
     }
@@ -4595,4 +4562,41 @@ public class WebClusterManager implements 
HttpClusterManager, ProtocolHandler, C
     public Set<String> getControllerServiceIdentifiers(final Class<? extends 
ControllerService> serviceType) {
         return 
controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
     }
+
+    /**
+     * Captures snapshots of components' metrics
+     */
+    private class CaptureComponentMetrics implements Runnable {
+        @Override
+        public void run() {
+            readLock.lock();
+            try {
+                for (final Node node : nodes) {
+                    if (Status.CONNECTED.equals(node.getStatus())) {
+                        ComponentStatusRepository statusRepository = 
componentMetricsRepositoryMap.get(node.getNodeId());
+                        if (statusRepository == null) {
+                            statusRepository = 
createComponentStatusRepository();
+                            
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
+                        }
+
+                        // ensure this node has a payload
+                        if (node.getHeartbeat() != null && 
node.getHeartbeatPayload() != null) {
+                            // if nothing has been captured or the current 
heartbeat is newer, capture it - comparing the heatbeat created timestamp
+                            // is safe since its marked as XmlTransient so 
we're assured that its based off the same clock that created the last capture 
date
+                            if (statusRepository.getLastCaptureDate() == null 
|| node.getHeartbeat().getCreatedTimestamp() > 
statusRepository.getLastCaptureDate().getTime()) {
+                                
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
+                            }
+                        }
+                    }
+                }
+            } catch (final Throwable t) {
+                logger.warn("Unable to capture component metrics from Node 
heartbeats: " + t);
+                if (logger.isDebugEnabled()) {
+                    logger.warn("", t);
+                }
+            } finally {
+                readLock.unlock("capture component metrics from node 
heartbeats");
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b07e13a1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index cacd6f9..cb310a0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -114,7 +114,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
     private List<ACL> acl;
 
 
-    public ZooKeeperStateProvider() throws Exception {
+    public ZooKeeperStateProvider() {
     }
 
 
@@ -232,6 +232,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
             if (Code.SESSIONEXPIRED == ke.code()) {
                 invalidateClient();
                 onComponentRemoved(componentId);
+                return;
             }
 
             throw new IOException("Unable to remove state for component with 
ID '" + componentId + "' from ZooKeeper", ke);
@@ -309,6 +310,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
             } catch (final KeeperException ke) {
                 if (ke.code() == Code.NONODE) {
                     createNode(path, data);
+                    return;
                 } else {
                     throw ke;
                 }
@@ -320,9 +322,11 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
             if (Code.SESSIONEXPIRED == ke.code()) {
                 invalidateClient();
                 setState(stateValues, version, componentId);
+                return;
             }
             if (Code.NODEEXISTS == ke.code()) {
                 setState(stateValues, version, componentId);
+                return;
             }
 
             throw new IOException("Failed to set cluster-wide state in 
ZooKeeper for component with ID " + componentId, ke);
@@ -347,16 +351,19 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
             if (Code.SESSIONEXPIRED == ke.code()) {
                 invalidateClient();
                 createNode(path, data);
+                return;
             }
 
             // Node already exists. Node must have been created by "someone 
else". Just set the data.
             if (ke.code() == Code.NODEEXISTS) {
                 try {
                     getZooKeeper().setData(path, data, -1);
+                    return;
                 } catch (final KeeperException ke1) {
                     // Node no longer exists -- it was removed by someone 
else. Go recreate the node.
                     if (ke1.code() == Code.NONODE) {
                         createNode(path, data);
+                        return;
                     }
                 } catch (final InterruptedException ie) {
                     throw new IOException("Failed to update cluster-wide state 
due to interruption", ie);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b07e13a1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
index b6272c7..7e03a9c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
@@ -69,8 +69,7 @@ public class TestZooKeeperStateProvider extends 
AbstractTestStateProvider {
         this.provider = createProvider(properties);
     }
 
-    private ZooKeeperStateProvider createProvider(final 
Map<PropertyDescriptor, String> properties) throws Exception {
-        final ZooKeeperStateProvider provider = new ZooKeeperStateProvider();
+    private void initializeProvider(final ZooKeeperStateProvider provider, 
final Map<PropertyDescriptor, String> properties) throws IOException {
         provider.initialize(new StateProviderInitializationContext() {
             @Override
             public String getIdentifier() {
@@ -97,7 +96,11 @@ public class TestZooKeeperStateProvider extends 
AbstractTestStateProvider {
                 return null;
             }
         });
+    }
 
+    private ZooKeeperStateProvider createProvider(final 
Map<PropertyDescriptor, String> properties) throws Exception {
+        final ZooKeeperStateProvider provider = new ZooKeeperStateProvider();
+        initializeProvider(provider, properties);
         provider.enable();
         return provider;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/b07e13a1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
index 7fa54da..f0a9094 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java
@@ -41,7 +41,7 @@ public class StandardComponentStateDAO implements 
ComponentStateDAO {
 
             return manager.getState(scope);
         } catch (final IOException ioe) {
-            throw new IllegalStateException(String.format("Unable to get the 
state for the specified component %s: %s", componentId, ioe));
+            throw new IllegalStateException(String.format("Unable to get the 
state for the specified component %s: %s", componentId, ioe), ioe);
         }
     }
 
@@ -56,7 +56,7 @@ public class StandardComponentStateDAO implements 
ComponentStateDAO {
             manager.clear(Scope.CLUSTER);
             manager.clear(Scope.LOCAL);
         } catch (final IOException ioe) {
-            throw new IllegalStateException(String.format("Unable to clear the 
state for the specified component %s: %s", componentId, ioe));
+            throw new IllegalStateException(String.format("Unable to clear the 
state for the specified component %s: %s", componentId, ioe), ioe);
         }
     }
 

Reply via email to