http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java deleted file mode 100644 index 1195bc9..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller; - -public interface Heartbeater { - - void heartbeat(); -}
http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java index e91ba9a..0d7f3ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.Heartbeater; public interface ControllerServiceNode extends ConfiguredComponent { @@ -64,10 +63,8 @@ public interface ControllerServiceNode extends ConfiguredComponent { * initiate service enabling task as well as its re-tries * @param administrativeYieldMillis * the amount of milliseconds to wait for administrative yield - * @param heartbeater - * the instance of {@link Heartbeater} */ - void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater); + void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis); /** * Will disable this service. Disabling of the service typically means @@ -76,10 +73,8 @@ public interface ControllerServiceNode extends ConfiguredComponent { * @param scheduler * implementation of {@link ScheduledExecutorService} used to * initiate service disabling task - * @param heartbeater - * the instance of {@link Heartbeater} */ - void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater); + void disable(ScheduledExecutorService scheduler); /** * @return the ControllerServiceReference that describes which components are referencing this Controller Service @@ -139,12 +134,12 @@ public interface ControllerServiceNode extends ConfiguredComponent { /** * Returns 'true' if this service is active. The service is considered to be * active if and only if it's - * {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation + * {@link #enable(ScheduledExecutorService, long)} operation * has been invoked and the service has been transitioned to ENABLING state. * The service will also remain 'active' after its been transitioned to * ENABLED state. <br> * The service will be de-activated upon invocation of - * {@link #disable(ScheduledExecutorService, Heartbeater)}. + * {@link #disable(ScheduledExecutorService)}. */ boolean isActive(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java index ff3ad4e..1146a39 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java @@ -20,18 +20,14 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.OutputStream; -import java.util.List; + import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; import javax.xml.bind.Unmarshaller; import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + import org.apache.nifi.cluster.protocol.ProtocolException; -import org.apache.nifi.controller.Counter; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.diagnostics.SystemDiagnostics; -import org.apache.nifi.jaxb.CounterAdapter; /** * The payload of the heartbeat. The payload contains status to inform the cluster manager the current workload of this node. @@ -50,23 +46,11 @@ public class HeartbeatPayload { } } - private List<Counter> counters; - private ProcessGroupStatus processGroupStatus; private int activeThreadCount; private long totalFlowFileCount; private long totalFlowFileBytes; - private SystemDiagnostics systemDiagnostics; private long systemStartTime; - @XmlJavaTypeAdapter(CounterAdapter.class) - public List<Counter> getCounters() { - return counters; - } - - public void setCounters(final List<Counter> counters) { - this.counters = counters; - } - public int getActiveThreadCount() { return activeThreadCount; } @@ -91,22 +75,6 @@ public class HeartbeatPayload { this.totalFlowFileBytes = totalFlowFileBytes; } - public ProcessGroupStatus getProcessGroupStatus() { - return processGroupStatus; - } - - public void setProcessGroupStatus(final ProcessGroupStatus processGroupStatus) { - this.processGroupStatus = processGroupStatus; - } - - public SystemDiagnostics getSystemDiagnostics() { - return systemDiagnostics; - } - - public void setSystemDiagnostics(final SystemDiagnostics systemDiagnostics) { - this.systemDiagnostics = systemDiagnostics; - } - public long getSystemStartTime() { return systemStartTime; } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 1ef18c0..d43a3db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueue; @@ -71,7 +70,7 @@ public final class StandardConnection implements Connection { relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships)); scheduler = builder.scheduler; flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager, - scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater); + scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold()); hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } @@ -270,7 +269,6 @@ public final class StandardConnection implements Connection { private FlowFileRepository flowFileRepository; private ProvenanceEventRepository provenanceRepository; private ResourceClaimManager resourceClaimManager; - private Heartbeater heartbeater; public Builder(final ProcessScheduler scheduler) { this.scheduler = scheduler; @@ -306,11 +304,6 @@ public final class StandardConnection implements Connection { return this; } - public Builder heartbeater(final Heartbeater heartbeater) { - this.heartbeater = heartbeater; - return this; - } - public Builder bendPoints(final List<Position> bendPoints) { this.bendPoints.clear(); this.bendPoints.addAll(bendPoints); http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index c9aaceb..eb6b6c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,7 +16,40 @@ */ package org.apache.nifi.controller; -import com.sun.jersey.api.client.ClientHandlerException; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.SSLContext; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.admin.service.AuditService; @@ -27,16 +60,13 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; -import org.apache.nifi.cluster.BulletinsPayload; import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.Heartbeat; -import org.apache.nifi.cluster.protocol.NodeBulletins; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; -import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.connectable.Connectable; @@ -109,7 +139,6 @@ import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.events.NodeBulletinProcessingStrategy; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -184,41 +213,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; +import com.sun.jersey.api.client.ClientHandlerException; -public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider { +public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider { // default repository implementations public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository"; @@ -308,7 +305,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R /** * timer to periodically send heartbeats to the cluster */ - private ScheduledFuture<?> bulletinFuture; private ScheduledFuture<?> heartbeatGeneratorFuture; private ScheduledFuture<?> heartbeatSenderFuture; @@ -318,8 +314,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R */ private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null); - private final AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber; - // guarded by rwLock /** * the node identifier; @@ -420,7 +414,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); bulletinRepository = new VolatileBulletinRepository(); - nodeBulletinSubscriber = new AtomicReference<>(); try { this.provenanceEventRepository = createProvenanceRepository(properties); @@ -437,7 +430,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new RuntimeException(e); } - processScheduler = new StandardProcessScheduler(this, this, encryptor, stateManagerProvider); + processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); @@ -833,7 +826,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R .resourceClaimManager(resourceClaimManager) .flowFileRepository(flowFileRepository) .provenanceRepository(provenanceEventRepository) - .heartbeater(this) .build(); } @@ -2122,7 +2114,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final ProcessGroupStatus status = new ProcessGroupStatus(); status.setId(group.getIdentifier()); status.setName(group.getName()); - status.setCreationTimestamp(new Date().getTime()); int activeGroupThreads = 0; long bytesRead = 0L; long bytesWritten = 0L; @@ -2899,7 +2890,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public Counter resetCounter(final String identifier) { final CounterRepository counterRepo = counterRepositoryRef.get(); final Counter resetValue = counterRepo.resetCounter(identifier); - heartbeat(); return resetValue; } @@ -2955,8 +2945,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R stopHeartbeating(); - bulletinFuture = clusterTaskExecutor.scheduleWithFixedDelay(new BulletinsTask(protocolSender), 250, 2000, TimeUnit.MILLISECONDS); - final HeartbeatMessageGeneratorTask heartbeatMessageGeneratorTask = new HeartbeatMessageGeneratorTask(); heartbeatMessageGeneratorTaskRef.set(heartbeatMessageGeneratorTask); heartbeatGeneratorFuture = clusterTaskExecutor.scheduleWithFixedDelay(heartbeatMessageGeneratorTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS); @@ -3007,10 +2995,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (heartbeatSenderFuture != null) { heartbeatSenderFuture.cancel(false); } - - if (bulletinFuture != null) { - bulletinFuture.cancel(false); - } } finally { writeLock.unlock(); } @@ -3135,8 +3119,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // update the bulletin repository if (isChanging) { if (clustered) { - nodeBulletinSubscriber.set(new NodeBulletinProcessingStrategy()); - bulletinRepository.overrideDefaultBulletinProcessing(nodeBulletinSubscriber.get()); stateManagerProvider.enableClusterProvider(); if (zooKeeperStateServer != null) { @@ -3175,7 +3157,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L)); } } else { - bulletinRepository.restoreDefaultBulletinProcessing(); if (zooKeeperStateServer != null) { zooKeeperStateServer.shutdown(); } @@ -3481,6 +3462,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return replayFlowFile(record, requestor); } + @SuppressWarnings("deprecation") public ProvenanceEventRecord replayFlowFile(final ProvenanceEventRecord event, final String requestor) throws IOException { if (event == null) { throw new NullPointerException(); @@ -3621,7 +3603,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - @Override public void heartbeat() { if (!isClustered()) { return; @@ -3636,110 +3617,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } - private class BulletinsTask implements Runnable { - - private final NodeProtocolSender protocolSender; - private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US); - - public BulletinsTask(final NodeProtocolSender protocolSender) { - if (protocolSender == null) { - throw new IllegalArgumentException("NodeProtocolSender may not be null."); - } - this.protocolSender = protocolSender; - } - - @Override - public void run() { - try { - final NodeBulletinsMessage message = createBulletinsMessage(); - if (message == null) { - return; - } - - protocolSender.sendBulletins(message); - if (LOG.isDebugEnabled()) { - LOG.debug( - String.format( - "Sending bulletins to cluster manager at %s", - dateFormatter.format(new Date()))); - } - - } catch (final UnknownServiceAddressException usae) { - if (LOG.isDebugEnabled()) { - LOG.debug(usae.getMessage()); - } - } catch (final Exception ex) { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to send bulletins to cluster manager due to: " + ex, ex); - } - } - } - - private boolean isIllegalXmlChar(final char c) { - return c < 0x20 && c != 0x09 && c != 0x0A && c != 0x0D; - } - - private boolean containsIllegalXmlChars(final Bulletin bulletin) { - final String message = bulletin.getMessage(); - for (int i = 0; i < message.length(); i++) { - final char c = message.charAt(i); - if (isIllegalXmlChar(c)) { - return true; - } - } - - return false; - } - - private String stripIllegalXmlChars(final String value) { - final StringBuilder sb = new StringBuilder(value.length()); - for (int i = 0; i < value.length(); i++) { - final char c = value.charAt(i); - sb.append(isIllegalXmlChar(c) ? '?' : c); - } - - return sb.toString(); - } - - private NodeBulletinsMessage createBulletinsMessage() { - final Set<Bulletin> nodeBulletins = nodeBulletinSubscriber.get().getBulletins(); - final Set<Bulletin> escapedNodeBulletins = new HashSet<>(nodeBulletins.size()); - - // ensure there are some bulletins to report - if (nodeBulletins.isEmpty()) { - return null; - } - - for (final Bulletin bulletin : nodeBulletins) { - final Bulletin escapedBulletin; - if (containsIllegalXmlChars(bulletin)) { - final String escapedBulletinMessage = stripIllegalXmlChars(bulletin.getMessage()); - - if (bulletin.getGroupId() == null) { - escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); - } else { - escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(), - bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage); - } - } else { - escapedBulletin = bulletin; - } - - escapedNodeBulletins.add(escapedBulletin); - } - - // create the bulletin payload - final BulletinsPayload payload = new BulletinsPayload(); - payload.setBulletins(escapedNodeBulletins); - - // create bulletin message - final NodeBulletins bulletins = new NodeBulletins(getNodeId(), payload.marshal()); - final NodeBulletinsMessage message = new NodeBulletinsMessage(); - message.setBulletins(bulletins); - - return message; - } - } private class HeartbeatSendTask implements Runnable { @@ -3816,20 +3693,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return null; } - final ProcessGroupStatus procGroupStatus = getGroupStatus(bean.getRootGroup(), getProcessorStats()); // create heartbeat payload final HeartbeatPayload hbPayload = new HeartbeatPayload(); hbPayload.setSystemStartTime(systemStartTime); - hbPayload.setActiveThreadCount(procGroupStatus.getActiveThreadCount()); + hbPayload.setActiveThreadCount(getActiveThreadCount()); final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup()); hbPayload.setTotalFlowFileCount(queueSize.getObjectCount()); hbPayload.setTotalFlowFileBytes(queueSize.getByteCount()); - hbPayload.setCounters(getCounters()); - hbPayload.setSystemDiagnostics(getSystemDiagnostics()); - hbPayload.setProcessGroupStatus(procGroupStatus); - // create heartbeat message final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.isConnected(), hbPayload.marshal()); final HeartbeatMessage message = new HeartbeatMessage(); http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 0f3ffe0..6735959 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -111,7 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final FlowFileRepository flowFileRepository; private final ProvenanceEventRepository provRepository; private final ResourceClaimManager resourceClaimManager; - private final Heartbeater heartbeater; private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>(); @@ -120,8 +119,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { private final ProcessScheduler scheduler; public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo, - final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold, - final Heartbeater heartbeater) { + final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) { activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>())); priorities = new ArrayList<>(); swapQueue = new ArrayList<>(); @@ -135,7 +133,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { this.swapThreshold = swapThreshold; this.scheduler = scheduler; this.connection = connection; - this.heartbeater = heartbeater; readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100); writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100); @@ -1182,9 +1179,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue { logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}", dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor); dropRequest.setState(DropFlowFileState.COMPLETE); - if (heartbeater != null) { - heartbeater.heartbeat(); - } } catch (final Exception e) { logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString()); logger.error("", e); http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index f7e968e..3e4d3a3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -37,7 +37,6 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; @@ -70,7 +69,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class); private final ControllerServiceProvider controllerServiceProvider; - private final Heartbeater heartbeater; private final long administrativeYieldMillis; private final String administrativeYieldDuration; private final StateManagerProvider stateManagerProvider; @@ -85,9 +83,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final StringEncryptor encryptor; - public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, + public StandardProcessScheduler(final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManagerProvider stateManagerProvider) { - this.heartbeater = heartbeater; this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; this.stateManagerProvider = stateManagerProvider; @@ -303,7 +300,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void trigger() { getSchedulingAgent(procNode).schedule(procNode, scheduleState); - heartbeater.heartbeat(); } @Override @@ -441,7 +437,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); - heartbeater.heartbeat(); } } } @@ -541,12 +536,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void enableControllerService(final ControllerServiceNode service) { - service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater); + service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis); } @Override public void disableControllerService(final ControllerServiceNode service) { - service.disable(this.componentLifeCycleThreadPool, this.heartbeater); + service.disable(this.componentLifeCycleThreadPool); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 3f24ff1..bed6a35 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -36,7 +36,6 @@ import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.annotation.OnConfigured; import org.apache.nifi.controller.exception.ComponentLifeCycleException; @@ -273,8 +272,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i * as it reached ENABLED state. */ @Override - public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis, - final Heartbeater heartbeater) { + public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) { if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) { this.active.set(true); final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null); @@ -287,13 +285,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i synchronized (active) { shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED); } - if (shouldEnable) { - heartbeater.heartbeat(); - } else { + if (!shouldEnable) { LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated."); // Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be // set to DISABLING (see disable() operation) - invokeDisable(configContext, heartbeater); + invokeDisable(configContext); stateRef.set(ControllerServiceState.DISABLED); } } catch (Exception e) { @@ -301,7 +297,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this); componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString()); - invokeDisable(configContext, heartbeater); + invokeDisable(configContext); if (isActive()) { scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS); @@ -323,14 +319,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i * If such transition doesn't succeed (the service is still in ENABLING state) * then the service will still be transitioned to DISABLING state to ensure that * no other transition could happen on this service. However in such event - * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long, Heartbeater)} - * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long, Heartbeater)} + * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long)} + * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long)} * <br> * Upon successful invocation of @OnDisabled this service will be transitioned to * DISABLED state. */ @Override - public void disable(ScheduledExecutorService scheduler, final Heartbeater heartbeater) { + public void disable(ScheduledExecutorService scheduler) { /* * The reason for synchronization is to ensure consistency of the * service state when another thread is in the middle of enabling this @@ -347,10 +343,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i @Override public void run() { try { - invokeDisable(configContext, heartbeater); + invokeDisable(configContext); } finally { stateRef.set(ControllerServiceState.DISABLED); - heartbeater.heartbeat(); } } }); @@ -362,7 +357,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i /** * */ - private void invokeDisable(ConfigurationContext configContext, Heartbeater heartbeater) { + private void invokeDisable(ConfigurationContext configContext) { try { ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java new file mode 100644 index 0000000..8b9f383 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status.history; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; + +public enum ConnectionStatusDescriptor { + INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>( + "inputBytes", + "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getInputBytes(); + } + })), + + INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>( + "inputCount", + "FlowFiles In (5 mins)", + "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", + Formatter.COUNT, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getInputCount()); + } + })), + + OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>( + "outputBytes", + "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getOutputBytes(); + } + })), + + OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>( + "outputCount", + "FlowFiles Out (5 mins)", + "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", + Formatter.COUNT, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getOutputCount()); + } + })), + + QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>( + "queuedBytes", + "Queued Bytes", + "The number of Bytes queued in this Connection", + Formatter.DATA_SIZE, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return status.getQueuedBytes(); + } + })), + + QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>( + "queuedCount", + "Queued Count", + "The number of FlowFiles queued in this Connection", + Formatter.COUNT, + new ValueMapper<ConnectionStatus>() { + @Override + public Long getValue(final ConnectionStatus status) { + return Long.valueOf(status.getQueuedCount()); + } + })); + + + private MetricDescriptor<ConnectionStatus> descriptor; + + private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) { + this.descriptor = descriptor; + } + + public String getField() { + return descriptor.getField(); + } + + public MetricDescriptor<ConnectionStatus> getDescriptor() { + return descriptor; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java new file mode 100644 index 0000000..d5325d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status.history; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; + +public enum ProcessGroupStatusDescriptor { + + BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", + "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesRead(); + } + })), + + BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", + "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesWritten(); + } + })), + + BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", + "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getBytesRead() + status.getBytesWritten(); + } + })), + + INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getInputContentSize(); + } + })), + + INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", + "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getInputCount().longValue(); + } + })), + + OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getOutputContentSize(); + } + })), + + OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", + "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getOutputCount().longValue(); + } + })), + + QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", + "The cumulative size of all FlowFiles queued in all Connections of this Process Group", + Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getQueuedContentSize(); + } + })), + + QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", + "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return status.getQueuedCount().longValue(); + } + })), + + TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", + "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", + Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() { + @Override + public Long getValue(final ProcessGroupStatus status) { + return calculateTaskMillis(status); + } + })); + + private MetricDescriptor<ProcessGroupStatus> descriptor; + + private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) { + this.descriptor = descriptor; + } + + public String getField() { + return descriptor.getField(); + } + + public MetricDescriptor<ProcessGroupStatus> getDescriptor() { + return descriptor; + } + + + private static long calculateTaskMillis(final ProcessGroupStatus status) { + long nanos = 0L; + + for (final ProcessorStatus procStatus : status.getProcessorStatus()) { + nanos += procStatus.getProcessingNanos(); + } + + for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) { + nanos += calculateTaskMillis(childStatus); + } + + return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java new file mode 100644 index 0000000..89e8aa0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status.history; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; + +public enum ProcessorStatusDescriptor { + BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>( + "bytesRead", + "Bytes Read (5 mins)", + "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesRead(); + } + })), + + BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>( + "bytesWritten", + "Bytes Written (5 mins)", + "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesWritten(); + } + })), + + BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>( + "bytesTransferred", + "Bytes Transferred (5 mins)", + "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesRead() + status.getBytesWritten(); + } + })), + + INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>( + "inputBytes", + "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getInputBytes(); + } + })), + + INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>( + "inputCount", + "FlowFiles In (5 mins)", + "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getInputCount()); + } + })), + OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>( + "outputBytes", + "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getOutputBytes(); + } + })), + + OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>( + "outputCount", + "FlowFiles Out (5 mins)", + "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", + Formatter.COUNT, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getOutputCount()); + } + })), + + TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>( + "taskCount", + "Tasks (5 mins)", + "The number of tasks that this Processor has completed in the past 5 minutes", + Formatter.COUNT, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getInvocations()); + } + })), + + TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>( + "taskMillis", + "Total Task Duration (5 mins)", + "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", + Formatter.DURATION, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS); + } + })), + + FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>( + "flowFilesRemoved", + "FlowFiles Removed (5 mins)", + "The total number of FlowFiles removed by this Processor in the last 5 minutes", + Formatter.COUNT, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getFlowFilesRemoved()); + } + })), + + AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>( + "averageLineageDuration", + "Average Lineage Duration (5 mins)", + "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.", + Formatter.DURATION, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); + } + }, new ValueReducer<StatusSnapshot, Long>() { + @Override + public Long reduce(final List<StatusSnapshot> values) { + long millis = 0L; + int count = 0; + + for (final StatusSnapshot snapshot : values) { + final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue(); + count += removed; + + count += snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue(); + + final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); + final long totalMillis = avgMillis * removed; + millis += totalMillis; + } + + return count == 0 ? 0 : millis / count; + } + } + )), + + AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>( + "averageTaskMillis", + "Average Task Duration", + "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes", + Formatter.DURATION, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / status.getInvocations(); + } + }, + new ValueReducer<StatusSnapshot, Long>() { + @Override + public Long reduce(final List<StatusSnapshot> values) { + long procMillis = 0L; + int invocations = 0; + + for (final StatusSnapshot snapshot : values) { + procMillis += snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue(); + invocations += snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue(); + } + + if (invocations == 0) { + return 0L; + } + + return procMillis / invocations; + } + })); + + private MetricDescriptor<ProcessorStatus> descriptor; + + private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) { + this.descriptor = descriptor; + } + + public String getField() { + return descriptor.getField(); + } + + public MetricDescriptor<ProcessorStatus> getDescriptor() { + return descriptor; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java new file mode 100644 index 0000000..0499d65 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status.history; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; + +public enum RemoteProcessGroupStatusDescriptor { + SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", + "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return status.getSentContentSize(); + } + })), + + SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", + "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getSentCount().longValue()); + } + })), + + RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", + "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return status.getReceivedContentSize(); + } + })), + + RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", + "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getReceivedCount().longValue()); + } + })), + + RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", + "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", + Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getReceivedContentSize().longValue() / 300L); + } + })), + + SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", + "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getSentContentSize().longValue() / 300L); + } + })), + + TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", + "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", + Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); + } + })), + + AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>( + "averageLineageDuration", + "Average Lineage Duration (5 mins)", + "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.", + Formatter.DURATION, + new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); + } + }, new ValueReducer<StatusSnapshot, Long>() { + @Override + public Long reduce(final List<StatusSnapshot> values) { + long millis = 0L; + int count = 0; + + for (final StatusSnapshot snapshot : values) { + final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue(); + count += sent; + + final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); + final long totalMillis = avgMillis * sent; + millis += totalMillis; + } + + return count == 0 ? 0 : millis / count; + } + })); + + private final MetricDescriptor<RemoteProcessGroupStatus> descriptor; + + private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) { + this.descriptor = descriptor; + } + + public String getField() { + return descriptor.getField(); + } + + public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() { + return descriptor; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java index 014b0a6..756e576 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java @@ -44,9 +44,9 @@ public class StatusHistoryUtil { final StatusHistoryDTO dto = new StatusHistoryDTO(); dto.setGenerated(new Date()); - dto.setDetails(componentDetails); + dto.setComponentDetails(componentDetails); dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(metricDescriptors)); - dto.setStatusSnapshots(snapshotDtos); + dto.setAggregateSnapshots(snapshotDtos); return dto; }
