NIFI-259: Bug fixes
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b07e13a1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b07e13a1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b07e13a1 Branch: refs/heads/master Commit: b07e13a1d82af20919349387841833dc2daa16c7 Parents: 8f9c0b9 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Jan 21 13:44:44 2016 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Thu Jan 21 13:44:44 2016 -0500 ---------------------------------------------------------------------- .../cluster/manager/impl/WebClusterManager.java | 72 +++++++++++--------- .../zookeeper/ZooKeeperStateProvider.java | 9 ++- .../zookeeper/TestZooKeeperStateProvider.java | 7 +- .../web/dao/impl/StandardComponentStateDAO.java | 4 +- 4 files changed, 53 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b07e13a1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 1ebf010..670736c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -421,40 +421,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } componentStatusSnapshotMillis = snapshotMillis; - Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - readLock.lock(); - try { - for (final Node node : nodes) { - if (Status.CONNECTED.equals(node.getStatus())) { - ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); - if (statusRepository == null) { - statusRepository = createComponentStatusRepository(); - componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository); - } - - // ensure this node has a payload - if (node.getHeartbeat() != null && node.getHeartbeatPayload() != null) { - // if nothing has been captured or the current heartbeat is newer, capture it - comparing the heatbeat created timestamp - // is safe since its marked as XmlTransient so we're assured that its based off the same clock that created the last capture date - if (statusRepository.getLastCaptureDate() == null || node.getHeartbeat().getCreatedTimestamp() > statusRepository.getLastCaptureDate().getTime()) { - statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus()); - } - } - } - } - } catch (final Throwable t) { - logger.warn("Unable to capture component metrics from Node heartbeats: " + t); - if (logger.isDebugEnabled()) { - logger.warn("", t); - } - } finally { - readLock.unlock("capture component metrics from node heartbeats"); - } - } - }, componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS); - remoteInputPort = properties.getRemoteInputPort(); if (remoteInputPort == null) { remoteSiteListener = null; @@ -496,6 +462,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor)); processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10); processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10); + processScheduler.scheduleFrameworkTask(new CaptureComponentMetrics(), "Capture Component Metrics", componentStatusSnapshotMillis, componentStatusSnapshotMillis, TimeUnit.MILLISECONDS); controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); } @@ -4595,4 +4562,41 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) { return controllerServiceProvider.getControllerServiceIdentifiers(serviceType); } + + /** + * Captures snapshots of components' metrics + */ + private class CaptureComponentMetrics implements Runnable { + @Override + public void run() { + readLock.lock(); + try { + for (final Node node : nodes) { + if (Status.CONNECTED.equals(node.getStatus())) { + ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId()); + if (statusRepository == null) { + statusRepository = createComponentStatusRepository(); + componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository); + } + + // ensure this node has a payload + if (node.getHeartbeat() != null && node.getHeartbeatPayload() != null) { + // if nothing has been captured or the current heartbeat is newer, capture it - comparing the heatbeat created timestamp + // is safe since its marked as XmlTransient so we're assured that its based off the same clock that created the last capture date + if (statusRepository.getLastCaptureDate() == null || node.getHeartbeat().getCreatedTimestamp() > statusRepository.getLastCaptureDate().getTime()) { + statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus()); + } + } + } + } + } catch (final Throwable t) { + logger.warn("Unable to capture component metrics from Node heartbeats: " + t); + if (logger.isDebugEnabled()) { + logger.warn("", t); + } + } finally { + readLock.unlock("capture component metrics from node heartbeats"); + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b07e13a1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java index cacd6f9..cb310a0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java @@ -114,7 +114,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { private List<ACL> acl; - public ZooKeeperStateProvider() throws Exception { + public ZooKeeperStateProvider() { } @@ -232,6 +232,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { if (Code.SESSIONEXPIRED == ke.code()) { invalidateClient(); onComponentRemoved(componentId); + return; } throw new IOException("Unable to remove state for component with ID '" + componentId + "' from ZooKeeper", ke); @@ -309,6 +310,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { } catch (final KeeperException ke) { if (ke.code() == Code.NONODE) { createNode(path, data); + return; } else { throw ke; } @@ -320,9 +322,11 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { if (Code.SESSIONEXPIRED == ke.code()) { invalidateClient(); setState(stateValues, version, componentId); + return; } if (Code.NODEEXISTS == ke.code()) { setState(stateValues, version, componentId); + return; } throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke); @@ -347,16 +351,19 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { if (Code.SESSIONEXPIRED == ke.code()) { invalidateClient(); createNode(path, data); + return; } // Node already exists. Node must have been created by "someone else". Just set the data. if (ke.code() == Code.NODEEXISTS) { try { getZooKeeper().setData(path, data, -1); + return; } catch (final KeeperException ke1) { // Node no longer exists -- it was removed by someone else. Go recreate the node. if (ke1.code() == Code.NONODE) { createNode(path, data); + return; } } catch (final InterruptedException ie) { throw new IOException("Failed to update cluster-wide state due to interruption", ie); http://git-wip-us.apache.org/repos/asf/nifi/blob/b07e13a1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java index b6272c7..7e03a9c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java @@ -69,8 +69,7 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider { this.provider = createProvider(properties); } - private ZooKeeperStateProvider createProvider(final Map<PropertyDescriptor, String> properties) throws Exception { - final ZooKeeperStateProvider provider = new ZooKeeperStateProvider(); + private void initializeProvider(final ZooKeeperStateProvider provider, final Map<PropertyDescriptor, String> properties) throws IOException { provider.initialize(new StateProviderInitializationContext() { @Override public String getIdentifier() { @@ -97,7 +96,11 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider { return null; } }); + } + private ZooKeeperStateProvider createProvider(final Map<PropertyDescriptor, String> properties) throws Exception { + final ZooKeeperStateProvider provider = new ZooKeeperStateProvider(); + initializeProvider(provider, properties); provider.enable(); return provider; } http://git-wip-us.apache.org/repos/asf/nifi/blob/b07e13a1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java index 7fa54da..f0a9094 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardComponentStateDAO.java @@ -41,7 +41,7 @@ public class StandardComponentStateDAO implements ComponentStateDAO { return manager.getState(scope); } catch (final IOException ioe) { - throw new IllegalStateException(String.format("Unable to get the state for the specified component %s: %s", componentId, ioe)); + throw new IllegalStateException(String.format("Unable to get the state for the specified component %s: %s", componentId, ioe), ioe); } } @@ -56,7 +56,7 @@ public class StandardComponentStateDAO implements ComponentStateDAO { manager.clear(Scope.CLUSTER); manager.clear(Scope.LOCAL); } catch (final IOException ioe) { - throw new IllegalStateException(String.format("Unable to clear the state for the specified component %s: %s", componentId, ioe)); + throw new IllegalStateException(String.format("Unable to clear the state for the specified component %s: %s", componentId, ioe), ioe); } }