http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java index 02f7d6b..36288d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java @@ -20,19 +20,16 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; import org.apache.nifi.util.ComponentStatusReport; -import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ComponentStatusReport.ComponentType; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer.ForEachEvaluator; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,22 +230,9 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit return history; } - private static long calculateTaskMillis(final ProcessGroupStatus status) { - long nanos = 0L; - for (final ProcessorStatus procStatus : status.getProcessorStatus()) { - nanos += procStatus.getProcessingNanos(); - } - - for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) { - nanos += calculateTaskMillis(childStatus); - } - - return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); - } private static class Capture { - private final Date captureDate; private final ComponentStatusReport statusReport; @@ -266,407 +250,7 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit } } - public static enum RemoteProcessGroupStatusDescriptor { - - SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", - "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getSentContentSize(); - } - })), - SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", - "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getSentCount().longValue()); - } - })), - RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", - "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getReceivedContentSize(); - } - })), - RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", - "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getReceivedCount().longValue()); - } - })), - RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", - "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", - Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getReceivedContentSize().longValue() / 300L); - } - })), - SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", - "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getSentContentSize().longValue() / 300L); - } - })), - TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", - "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", - Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); - } - })), - AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>( - "averageLineageDuration", - "Average Lineage Duration (5 mins)", - "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.", - Formatter.DURATION, - new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); - } - }, new ValueReducer<StatusSnapshot, Long>() { - @Override - public Long reduce(final List<StatusSnapshot> values) { - long millis = 0L; - int count = 0; - - for (final StatusSnapshot snapshot : values) { - final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue(); - count += sent; - - final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); - final long totalMillis = avgMillis * sent; - millis += totalMillis; - } - - return count == 0 ? 0 : millis / count; - } - } - )); - - private final MetricDescriptor<RemoteProcessGroupStatus> descriptor; - - private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) { - this.descriptor = descriptor; - } - - public String getField() { - return descriptor.getField(); - } - - public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() { - return descriptor; - } - } - - public static enum ProcessGroupStatusDescriptor { - - BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", - "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesRead(); - } - })), - BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", - "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesWritten(); - } - })), - BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", - "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesRead() + status.getBytesWritten(); - } - })), - INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", - "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getInputContentSize(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", - "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getInputCount().longValue(); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", - "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getOutputContentSize(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", - "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getOutputCount().longValue(); - } - })), - QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", - "The cumulative size of all FlowFiles queued in all Connections of this Process Group", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getQueuedContentSize(); - } - })), - QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", - "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getQueuedCount().longValue(); - } - })), - TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", - "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", - Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return calculateTaskMillis(status); - } - })); - - private MetricDescriptor<ProcessGroupStatus> descriptor; - - private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) { - this.descriptor = descriptor; - } - - public String getField() { - return descriptor.getField(); - } - - public MetricDescriptor<ProcessGroupStatus> getDescriptor() { - return descriptor; - } - } - - public static enum ConnectionStatusDescriptor { - - INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)", - "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getInputBytes(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)", - "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getInputCount()); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)", - "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getOutputBytes(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)", - "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getOutputCount()); - } - })), - QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes", - "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getQueuedBytes(); - } - })), - QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count", - "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getQueuedCount()); - } - })); - - private MetricDescriptor<ConnectionStatus> descriptor; - - private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) { - this.descriptor = descriptor; - } - - public String getField() { - return descriptor.getField(); - } - - public MetricDescriptor<ConnectionStatus> getDescriptor() { - return descriptor; - } - } - public static enum ProcessorStatusDescriptor { - - BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)", - "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesRead(); - } - })), - BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)", - "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesWritten(); - } - })), - BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)", - "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesRead() + status.getBytesWritten(); - } - })), - INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)", - "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getInputBytes(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)", - "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getInputCount()); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)", - "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getOutputBytes(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)", - "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getOutputCount()); - } - })), - TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getInvocations()); - } - })), - TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)", - "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS); - } - })), - FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)", - "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getFlowFilesRemoved()); - } - })), - AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>( - "averageLineageDuration", - "Average Lineage Duration (5 mins)", - "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.", - Formatter.DURATION, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); - } - }, new ValueReducer<StatusSnapshot, Long>() { - @Override - public Long reduce(final List<StatusSnapshot> values) { - long millis = 0L; - int count = 0; - - for (final StatusSnapshot snapshot : values) { - final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue(); - count += removed; - - count += snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue(); - - final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); - final long totalMillis = avgMillis * removed; - millis += totalMillis; - } - - return count == 0 ? 0 : millis / count; - } - } - )), - AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>( - "averageTaskMillis", - "Average Task Duration", - "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes", - Formatter.DURATION, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / status.getInvocations(); - } - }, - new ValueReducer<StatusSnapshot, Long>() { - @Override - public Long reduce(final List<StatusSnapshot> values) { - long procMillis = 0L; - int invocations = 0; - - for (final StatusSnapshot snapshot : values) { - procMillis += snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue(); - invocations += snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue(); - } - - if (invocations == 0) { - return 0L; - } - - return procMillis / invocations; - } - } - )); - - private MetricDescriptor<ProcessorStatus> descriptor; - - private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) { - this.descriptor = descriptor; - } - - public String getField() { - return descriptor.getField(); - } - - public MetricDescriptor<ProcessorStatus> getDescriptor() { - return descriptor; - } - } @Override public List<MetricDescriptor<ConnectionStatus>> getConnectionMetricDescriptors() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java index d3cfd9e..ad9208e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java @@ -17,50 +17,24 @@ package org.apache.nifi.events; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.nifi.reporting.Bulletin; /** * */ public class NodeBulletinProcessingStrategy implements BulletinProcessingStrategy { - - private final Lock lock; - private final Set<Bulletin> bulletins; - - public NodeBulletinProcessingStrategy() { - lock = new ReentrantLock(); - bulletins = new LinkedHashSet<>(); - } + private final CircularFifoQueue<Bulletin> ringBuffer = new CircularFifoQueue<>(5); @Override - public void update(final Bulletin bulletin) { - lock.lock(); - try { - bulletins.add(bulletin); - } finally { - lock.unlock(); - } + public synchronized void update(final Bulletin bulletin) { + ringBuffer.add(bulletin); } - public Set<Bulletin> getBulletins() { - final Set<Bulletin> response = new HashSet<>(); - - lock.lock(); - try { - // get all the bulletins currently stored - response.addAll(bulletins); - - // remove the bulletins - bulletins.clear(); - } finally { - lock.unlock(); - } - + public synchronized Set<Bulletin> getBulletins() { + final Set<Bulletin> response = new HashSet<>(ringBuffer); return response; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java index 8aeb34d..7202546 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java @@ -19,7 +19,6 @@ package org.apache.nifi.events; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -207,25 +206,6 @@ public class VolatileBulletinRepository implements BulletinRepository { return controllerBulletins; } - /** - * Overrides the default bulletin processing strategy. When a custom - * bulletin strategy is employed, bulletins will not be persisted in this - * repository and will sent to the specified strategy instead. - * - * @param strategy bulletin strategy - */ - public void overrideDefaultBulletinProcessing(final BulletinProcessingStrategy strategy) { - Objects.requireNonNull(strategy); - this.processingStrategy = strategy; - } - - /** - * Restores the default bulletin processing strategy. - */ - public void restoreDefaultBulletinProcessing() { - this.processingStrategy = new DefaultBulletinProcessingStrategy(); - } - private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin bulletin) { final String storageKey = getBulletinStoreKey(bulletin); http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java index d6bfca0..af73eef 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/cluster/HeartbeatPayloadTest.java @@ -16,16 +16,12 @@ */ package org.apache.nifi.cluster; +import static org.junit.Assert.assertEquals; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.util.ArrayList; -import java.util.List; -import org.apache.nifi.controller.Counter; -import org.apache.nifi.controller.StandardCounter; -import org.apache.nifi.diagnostics.SystemDiagnostics; + import org.apache.nifi.util.NiFiProperties; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -35,15 +31,8 @@ import org.junit.Test; public class HeartbeatPayloadTest { private HeartbeatPayload payload; - - private List<Counter> counters; - - private Counter counter; - private int activeThreadCount; - private int totalFlowFileCount; - private ByteArrayOutputStream marshalledBytes; @BeforeClass @@ -53,19 +42,9 @@ public class HeartbeatPayloadTest { @Before public void setup() { - payload = new HeartbeatPayload(); - activeThreadCount = 15; totalFlowFileCount = 25; - - counters = new ArrayList<>(); - String identifier = "identifier"; - String context = "context"; - String name = "name"; - counter = new StandardCounter(identifier, context, name); - counters.add(counter); - marshalledBytes = new ByteArrayOutputStream(); } @@ -73,48 +52,19 @@ public class HeartbeatPayloadTest { public void testMarshallingWithNoInfo() { HeartbeatPayload.marshal(payload, marshalledBytes); HeartbeatPayload newPayload = HeartbeatPayload.unmarshal(new ByteArrayInputStream(marshalledBytes.toByteArray())); - assertNull(newPayload.getCounters()); assertEquals(0, newPayload.getActiveThreadCount()); assertEquals(0, newPayload.getTotalFlowFileCount()); } @Test public void testMarshalling() { - payload.setActiveThreadCount(activeThreadCount); payload.setTotalFlowFileCount(totalFlowFileCount); - payload.setCounters(counters); - payload.setSystemDiagnostics(new SystemDiagnostics()); HeartbeatPayload.marshal(payload, marshalledBytes); HeartbeatPayload newPayload = HeartbeatPayload.unmarshal(new ByteArrayInputStream(marshalledBytes.toByteArray())); - List<Counter> newCounters = newPayload.getCounters(); - assertEquals(1, newCounters.size()); - - Counter newCounter = newCounters.get(0); - assertCounterEquals(counter, newCounter); - assertEquals(activeThreadCount, newPayload.getActiveThreadCount()); assertEquals(totalFlowFileCount, newPayload.getTotalFlowFileCount()); } - - private void assertCounterEquals(Counter expected, Counter actual) { - assertEquals(expected.getContext(), actual.getContext()); - assertEquals(expected.getIdentifier(), actual.getIdentifier()); - assertEquals(expected.getName(), actual.getName()); - assertEquals(expected.getValue(), actual.getValue()); - } - -// private void assertRepositoryStatusReportEntryEquals(RepositoryStatusReportEntry expected, RepositoryStatusReportEntry actual) { -// assertEquals(expected.getConsumerId(), actual.getConsumerId()); -// assertEquals(expected.getBytesRead(), actual.getBytesRead()); -// assertEquals(expected.getBytesWritten(), actual.getBytesWritten()); -// assertEquals(expected.getContentSizeIn(), actual.getContentSizeIn()); -// assertEquals(expected.getContentSizeOut(), actual.getContentSizeOut()); -// assertEquals(expected.getFlowFilesIn(), actual.getFlowFilesIn()); -// assertEquals(expected.getFlowFilesOut(), actual.getFlowFilesOut()); -// assertEquals(expected.getInvocations(), actual.getInvocations()); -// assertEquals(expected.getProcessingNanos(), actual.getProcessingNanos()); -// } } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java index 2ba1161..18c55c6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java @@ -108,7 +108,7 @@ public class TestStandardFlowFileQueue { } }).when(provRepo).registerEvents(Mockito.any(Iterable.class)); - queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, null); + queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000); TestFlowFile.idGenerator.set(0L); } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index f8db35e..644018f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -139,7 +139,7 @@ public class TestStandardProcessSession { final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); - flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, null); + flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); Mockito.doAnswer(new Answer<Object>() { http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index cd4aa27..4094ca4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -82,7 +82,7 @@ public class TestWriteAheadFlowFileRepository { when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class)); final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager(); - final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000, null); + final FlowFileQueue queue = new StandardFlowFileQueue("1234", connection, null, null, claimManager, null, swapMgr, null, 10000); when(connection.getFlowFileQueue()).thenReturn(queue); queueProvider.addConnection(connection); http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 3b33478..8beafdb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -19,7 +19,6 @@ package org.apache.nifi.controller.scheduling; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; import java.lang.reflect.Field; import java.util.ArrayList; @@ -36,10 +35,9 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.components.state.StateManagerProvider; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; @@ -79,7 +77,7 @@ public class TestStandardProcessScheduler { public void setup() throws InitializationException { System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); this.refreshNiFiProperties(); - scheduler = new StandardProcessScheduler(Mockito.mock(Heartbeater.class), Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider); + scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider); scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class)); reportingTask = new TestReportingTask(); @@ -507,6 +505,6 @@ public class TestStandardProcessScheduler { } private ProcessScheduler createScheduler() { - return new StandardProcessScheduler(mock(Heartbeater.class), null, null, stateMgrProvider); + return new StandardProcessScheduler(null, null, stateMgrProvider); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 5abefda..f7a3386 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -29,7 +29,6 @@ import java.util.UUID; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; -import org.apache.nifi.controller.Heartbeater; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -76,8 +75,7 @@ public class TestStandardControllerServiceProvider { } private StandardProcessScheduler createScheduler() { - final Heartbeater heartbeater = Mockito.mock(Heartbeater.class); - return new StandardProcessScheduler(heartbeater, null, null, stateManagerProvider); + return new StandardProcessScheduler(null, null, stateManagerProvider); } @Test http://git-wip-us.apache.org/repos/asf/nifi/blob/137cccc8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index bc5245c..cfe18c5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -22,8 +22,8 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.web.api.dto.BulletinBoardDTO; import org.apache.nifi.web.api.dto.BulletinQueryDTO; import org.apache.nifi.web.api.dto.ClusterDTO; -import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ComponentHistoryDTO; +import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerConfigurationDTO; import org.apache.nifi.web.api.dto.ControllerDTO; @@ -39,7 +39,6 @@ import org.apache.nifi.web.api.dto.FunnelDTO; import org.apache.nifi.web.api.dto.LabelDTO; import org.apache.nifi.web.api.dto.ListingRequestDTO; import org.apache.nifi.web.api.dto.NodeDTO; -import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO; import org.apache.nifi.web.api.dto.PortDTO; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; @@ -61,16 +60,12 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; -import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; -import org.apache.nifi.web.api.dto.status.NodeStatusDTO; +import org.apache.nifi.web.api.dto.status.PortStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; +import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO; +import org.apache.nifi.web.api.dto.status.RemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import java.util.Collection; @@ -388,6 +383,15 @@ public interface NiFiServiceFacade { ProcessorDTO getProcessor(String id); /** + * Gets the processor status. + * + * @param groupId group + * @param id id + * @return status + */ + ProcessorStatusDTO getProcessorStatus(String groupId, String id); + + /** * Gets the processor status history. * * @param groupId group @@ -478,6 +482,15 @@ public interface NiFiServiceFacade { ConnectionDTO getConnection(String groupId, String connectionId); /** + * Gets the status of the specified connection. + * + * @param groupId group + * @param connectionId connection + * @return status + */ + ConnectionStatusDTO getConnectionStatus(String groupId, String connectionId); + + /** * Gets the status history of the specified connection. * * @param groupId group @@ -649,6 +662,15 @@ public interface NiFiServiceFacade { Set<PortDTO> getInputPorts(String groupId); /** + * Gets the input port status. + * + * @param groupId group + * @param inputPortId input port + * @return status + */ + PortStatusDTO getInputPortStatus(String groupId, String inputPortId); + + /** * Determines if the input port could be updated. * * @param groupId The id of the group @@ -715,6 +737,15 @@ public interface NiFiServiceFacade { Set<PortDTO> getOutputPorts(String groupId); /** + * Gets the output port status. + * + * @param groupId group + * @param outputPortId output port + * @return status + */ + PortStatusDTO getOutputPortStatus(String groupId, String outputPortId); + + /** * Determines if the output port could be updated. * * @param groupId The id of the group @@ -851,6 +882,15 @@ public interface NiFiServiceFacade { Set<RemoteProcessGroupDTO> getRemoteProcessGroups(String groupId); /** + * Gets the remote process group status. + * + * @param groupId group + * @param id remote process group + * @return status + */ + RemoteProcessGroupStatusDTO getRemoteProcessGroupStatus(String groupId, String id); + + /** * Gets the remote process group status history. * * @param groupId The id of the parent group @@ -1507,103 +1547,6 @@ public interface NiFiServiceFacade { */ void deleteNode(String nodeId); - /** - * Returns the status the specified node id. - * - * @param nodeId The id of the desired node - * @return The node status - */ - NodeStatusDTO getNodeStatus(String nodeId); - - /** - * Returns the system diagnostics for the specified node id. - * - * @param nodeId The id of the desired node - * @return The node status - */ - NodeSystemDiagnosticsDTO getNodeSystemDiagnostics(String nodeId); - - /** - * Returns the cluster's status. - * - * @return The cluster status - */ - ClusterStatusDTO getClusterStatus(); - - /** - * Returns a processor's status for each node connected to the cluster. - * - * @param processorId a processor identifier - * @return The cluster processor status transfer object. - */ - ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId); - - /** - * @param processorId id - * @return the processor status history for each node connected to the cluster - */ - ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId); - - /** - * Returns a connection's status for each node connected to the cluster. - * - * @param connectionId a connection identifier - * @return The cluster connection status transfer object. - */ - ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId); - - /** - * @param connectionId id - * @return the connection status history for each node connected to the cluster - */ - ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId); - - /** - * @param processGroupId id - * @return the process group status history for each node connected to the cluster - */ - ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId); - - /** - * Returns a process group's status for each node connected to the cluster. - * - * @param processorId a process group identifier - * @return The cluster process group status transfer object. - */ - ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processorId); - - /** - * Returns the remote process group status history for each node connected to the cluster. - * - * @param remoteProcessGroupId a remote process group identifier - * @return The cluster status history - */ - ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId); - - /** - * Returns a remote process group's status for each node connected to the cluster. - * - * @param remoteProcessGroupId a remote process group identifier - * @return The cluster remote process group status transfer object. - */ - ClusterRemoteProcessGroupStatusDTO getClusterRemoteProcessGroupStatus(String remoteProcessGroupId); - - /** - * Returns an input port's status for each node connected to the cluster. - * - * @param inputPortId a port identifier - * @return The cluster port status transfer object. - */ - ClusterPortStatusDTO getClusterInputPortStatus(String inputPortId); - - /** - * Returns an output port's status for each node connected to the cluster. - * - * @param outputPortId a port identifier - * @return The cluster port status transfer object. - */ - ClusterPortStatusDTO getClusterOutputPortStatus(String outputPortId); - // ---------------------------------------- // BulletinBoard methods // ----------------------------------------
