http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java new file mode 100644 index 0000000..89e8aa0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status.history; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; + +public enum ProcessorStatusDescriptor { + BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>( + "bytesRead", + "Bytes Read (5 mins)", + "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesRead(); + } + })), + + BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>( + "bytesWritten", + "Bytes Written (5 mins)", + "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesWritten(); + } + })), + + BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>( + "bytesTransferred", + "Bytes Transferred (5 mins)", + "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getBytesRead() + status.getBytesWritten(); + } + })), + + INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>( + "inputBytes", + "Bytes In (5 mins)", + "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getInputBytes(); + } + })), + + INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>( + "inputCount", + "FlowFiles In (5 mins)", + "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", + Formatter.COUNT, new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getInputCount()); + } + })), + OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>( + "outputBytes", + "Bytes Out (5 mins)", + "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", + Formatter.DATA_SIZE, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getOutputBytes(); + } + })), + + OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>( + "outputCount", + "FlowFiles Out (5 mins)", + "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", + Formatter.COUNT, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getOutputCount()); + } + })), + + TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>( + "taskCount", + "Tasks (5 mins)", + "The number of tasks that this Processor has completed in the past 5 minutes", + Formatter.COUNT, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getInvocations()); + } + })), + + TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>( + "taskMillis", + "Total Task Duration (5 mins)", + "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", + Formatter.DURATION, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS); + } + })), + + FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>( + "flowFilesRemoved", + "FlowFiles Removed (5 mins)", + "The total number of FlowFiles removed by this Processor in the last 5 minutes", + Formatter.COUNT, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return Long.valueOf(status.getFlowFilesRemoved()); + } + })), + + AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>( + "averageLineageDuration", + "Average Lineage Duration (5 mins)", + "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.", + Formatter.DURATION, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); + } + }, new ValueReducer<StatusSnapshot, Long>() { + @Override + public Long reduce(final List<StatusSnapshot> values) { + long millis = 0L; + int count = 0; + + for (final StatusSnapshot snapshot : values) { + final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue(); + count += removed; + + count += snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue(); + + final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); + final long totalMillis = avgMillis * removed; + millis += totalMillis; + } + + return count == 0 ? 0 : millis / count; + } + } + )), + + AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>( + "averageTaskMillis", + "Average Task Duration", + "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes", + Formatter.DURATION, + new ValueMapper<ProcessorStatus>() { + @Override + public Long getValue(final ProcessorStatus status) { + return status.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / status.getInvocations(); + } + }, + new ValueReducer<StatusSnapshot, Long>() { + @Override + public Long reduce(final List<StatusSnapshot> values) { + long procMillis = 0L; + int invocations = 0; + + for (final StatusSnapshot snapshot : values) { + procMillis += snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue(); + invocations += snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue(); + } + + if (invocations == 0) { + return 0L; + } + + return procMillis / invocations; + } + })); + + private MetricDescriptor<ProcessorStatus> descriptor; + + private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) { + this.descriptor = descriptor; + } + + public String getField() { + return descriptor.getField(); + } + + public MetricDescriptor<ProcessorStatus> getDescriptor() { + return descriptor; + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java new file mode 100644 index 0000000..0499d65 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status.history; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; + +public enum RemoteProcessGroupStatusDescriptor { + SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", + "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return status.getSentContentSize(); + } + })), + + SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", + "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getSentCount().longValue()); + } + })), + + RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", + "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return status.getReceivedContentSize(); + } + })), + + RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", + "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getReceivedCount().longValue()); + } + })), + + RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", + "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", + Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getReceivedContentSize().longValue() / 300L); + } + })), + + SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", + "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf(status.getSentContentSize().longValue() / 300L); + } + })), + + TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", + "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", + Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); + } + })), + + AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>( + "averageLineageDuration", + "Average Lineage Duration (5 mins)", + "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.", + Formatter.DURATION, + new ValueMapper<RemoteProcessGroupStatus>() { + @Override + public Long getValue(final RemoteProcessGroupStatus status) { + return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); + } + }, new ValueReducer<StatusSnapshot, Long>() { + @Override + public Long reduce(final List<StatusSnapshot> values) { + long millis = 0L; + int count = 0; + + for (final StatusSnapshot snapshot : values) { + final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue(); + count += sent; + + final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); + final long totalMillis = avgMillis * sent; + millis += totalMillis; + } + + return count == 0 ? 0 : millis / count; + } + })); + + private final MetricDescriptor<RemoteProcessGroupStatus> descriptor; + + private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) { + this.descriptor = descriptor; + } + + public String getField() { + return descriptor.getField(); + } + + public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() { + return descriptor; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java index 014b0a6..2b0cd9e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java @@ -44,9 +44,9 @@ public class StatusHistoryUtil { final StatusHistoryDTO dto = new StatusHistoryDTO(); dto.setGenerated(new Date()); - dto.setDetails(componentDetails); + dto.setComponentDetails(componentDetails); dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(metricDescriptors)); - dto.setStatusSnapshots(snapshotDtos); + dto.setAggregateStatusSnapshots(snapshotDtos); return dto; } http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java index 02f7d6b..36288d5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java @@ -20,19 +20,16 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; import org.apache.nifi.util.ComponentStatusReport; -import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ComponentStatusReport.ComponentType; +import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; import org.apache.nifi.util.RingBuffer.ForEachEvaluator; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,22 +230,9 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit return history; } - private static long calculateTaskMillis(final ProcessGroupStatus status) { - long nanos = 0L; - for (final ProcessorStatus procStatus : status.getProcessorStatus()) { - nanos += procStatus.getProcessingNanos(); - } - - for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) { - nanos += calculateTaskMillis(childStatus); - } - - return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); - } private static class Capture { - private final Date captureDate; private final ComponentStatusReport statusReport; @@ -266,407 +250,7 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit } } - public static enum RemoteProcessGroupStatusDescriptor { - - SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", - "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getSentContentSize(); - } - })), - SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", - "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getSentCount().longValue()); - } - })), - RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", - "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getReceivedContentSize(); - } - })), - RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", - "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getReceivedCount().longValue()); - } - })), - RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", - "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", - Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getReceivedContentSize().longValue() / 300L); - } - })), - SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", - "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf(status.getSentContentSize().longValue() / 300L); - } - })), - TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", - "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", - Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); - } - })), - AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>( - "averageLineageDuration", - "Average Lineage Duration (5 mins)", - "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.", - Formatter.DURATION, - new ValueMapper<RemoteProcessGroupStatus>() { - @Override - public Long getValue(final RemoteProcessGroupStatus status) { - return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); - } - }, new ValueReducer<StatusSnapshot, Long>() { - @Override - public Long reduce(final List<StatusSnapshot> values) { - long millis = 0L; - int count = 0; - - for (final StatusSnapshot snapshot : values) { - final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue(); - count += sent; - - final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); - final long totalMillis = avgMillis * sent; - millis += totalMillis; - } - - return count == 0 ? 0 : millis / count; - } - } - )); - - private final MetricDescriptor<RemoteProcessGroupStatus> descriptor; - - private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) { - this.descriptor = descriptor; - } - - public String getField() { - return descriptor.getField(); - } - - public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() { - return descriptor; - } - } - - public static enum ProcessGroupStatusDescriptor { - - BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", - "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesRead(); - } - })), - BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", - "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesWritten(); - } - })), - BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", - "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getBytesRead() + status.getBytesWritten(); - } - })), - INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", - "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getInputContentSize(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", - "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getInputCount().longValue(); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", - "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getOutputContentSize(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", - "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getOutputCount().longValue(); - } - })), - QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", - "The cumulative size of all FlowFiles queued in all Connections of this Process Group", - Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getQueuedContentSize(); - } - })), - QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", - "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return status.getQueuedCount().longValue(); - } - })), - TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", - "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", - Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() { - @Override - public Long getValue(final ProcessGroupStatus status) { - return calculateTaskMillis(status); - } - })); - - private MetricDescriptor<ProcessGroupStatus> descriptor; - - private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) { - this.descriptor = descriptor; - } - - public String getField() { - return descriptor.getField(); - } - - public MetricDescriptor<ProcessGroupStatus> getDescriptor() { - return descriptor; - } - } - - public static enum ConnectionStatusDescriptor { - - INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)", - "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getInputBytes(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)", - "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getInputCount()); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)", - "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getOutputBytes(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)", - "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getOutputCount()); - } - })), - QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes", - "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return status.getQueuedBytes(); - } - })), - QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count", - "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() { - @Override - public Long getValue(final ConnectionStatus status) { - return Long.valueOf(status.getQueuedCount()); - } - })); - - private MetricDescriptor<ConnectionStatus> descriptor; - - private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) { - this.descriptor = descriptor; - } - - public String getField() { - return descriptor.getField(); - } - - public MetricDescriptor<ConnectionStatus> getDescriptor() { - return descriptor; - } - } - public static enum ProcessorStatusDescriptor { - - BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)", - "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesRead(); - } - })), - BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)", - "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesWritten(); - } - })), - BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)", - "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getBytesRead() + status.getBytesWritten(); - } - })), - INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)", - "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getInputBytes(); - } - })), - INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)", - "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getInputCount()); - } - })), - OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)", - "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getOutputBytes(); - } - })), - OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)", - "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getOutputCount()); - } - })), - TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", - Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getInvocations()); - } - })), - TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)", - "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS); - } - })), - FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)", - "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return Long.valueOf(status.getFlowFilesRemoved()); - } - })), - AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>( - "averageLineageDuration", - "Average Lineage Duration (5 mins)", - "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.", - Formatter.DURATION, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getAverageLineageDuration(TimeUnit.MILLISECONDS); - } - }, new ValueReducer<StatusSnapshot, Long>() { - @Override - public Long reduce(final List<StatusSnapshot> values) { - long millis = 0L; - int count = 0; - - for (final StatusSnapshot snapshot : values) { - final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue(); - count += removed; - - count += snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue(); - - final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); - final long totalMillis = avgMillis * removed; - millis += totalMillis; - } - - return count == 0 ? 0 : millis / count; - } - } - )), - AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>( - "averageTaskMillis", - "Average Task Duration", - "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes", - Formatter.DURATION, - new ValueMapper<ProcessorStatus>() { - @Override - public Long getValue(final ProcessorStatus status) { - return status.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / status.getInvocations(); - } - }, - new ValueReducer<StatusSnapshot, Long>() { - @Override - public Long reduce(final List<StatusSnapshot> values) { - long procMillis = 0L; - int invocations = 0; - - for (final StatusSnapshot snapshot : values) { - procMillis += snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue(); - invocations += snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue(); - } - - if (invocations == 0) { - return 0L; - } - - return procMillis / invocations; - } - } - )); - - private MetricDescriptor<ProcessorStatus> descriptor; - - private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) { - this.descriptor = descriptor; - } - - public String getField() { - return descriptor.getField(); - } - - public MetricDescriptor<ProcessorStatus> getDescriptor() { - return descriptor; - } - } @Override public List<MetricDescriptor<ConnectionStatus>> getConnectionMetricDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java index d3cfd9e..ad9208e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/NodeBulletinProcessingStrategy.java @@ -17,50 +17,24 @@ package org.apache.nifi.events; import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.nifi.reporting.Bulletin; /** * */ public class NodeBulletinProcessingStrategy implements BulletinProcessingStrategy { - - private final Lock lock; - private final Set<Bulletin> bulletins; - - public NodeBulletinProcessingStrategy() { - lock = new ReentrantLock(); - bulletins = new LinkedHashSet<>(); - } + private final CircularFifoQueue<Bulletin> ringBuffer = new CircularFifoQueue<>(5); @Override - public void update(final Bulletin bulletin) { - lock.lock(); - try { - bulletins.add(bulletin); - } finally { - lock.unlock(); - } + public synchronized void update(final Bulletin bulletin) { + ringBuffer.add(bulletin); } - public Set<Bulletin> getBulletins() { - final Set<Bulletin> response = new HashSet<>(); - - lock.lock(); - try { - // get all the bulletins currently stored - response.addAll(bulletins); - - // remove the bulletins - bulletins.clear(); - } finally { - lock.unlock(); - } - + public synchronized Set<Bulletin> getBulletins() { + final Set<Bulletin> response = new HashSet<>(ringBuffer); return response; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java index 8aeb34d..7202546 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java @@ -19,7 +19,6 @@ package org.apache.nifi.events; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -207,25 +206,6 @@ public class VolatileBulletinRepository implements BulletinRepository { return controllerBulletins; } - /** - * Overrides the default bulletin processing strategy. When a custom - * bulletin strategy is employed, bulletins will not be persisted in this - * repository and will sent to the specified strategy instead. - * - * @param strategy bulletin strategy - */ - public void overrideDefaultBulletinProcessing(final BulletinProcessingStrategy strategy) { - Objects.requireNonNull(strategy); - this.processingStrategy = strategy; - } - - /** - * Restores the default bulletin processing strategy. - */ - public void restoreDefaultBulletinProcessing() { - this.processingStrategy = new DefaultBulletinProcessingStrategy(); - } - private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin bulletin) { final String storageKey = getBulletinStoreKey(bulletin); http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index bc5245c..a290fec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -67,7 +67,6 @@ import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.NodeStatusDTO; import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO; @@ -1538,11 +1537,6 @@ public interface NiFiServiceFacade { */ ClusterProcessorStatusDTO getClusterProcessorStatus(String processorId); - /** - * @param processorId id - * @return the processor status history for each node connected to the cluster - */ - ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId); /** * Returns a connection's status for each node connected to the cluster. @@ -1552,17 +1546,6 @@ public interface NiFiServiceFacade { */ ClusterConnectionStatusDTO getClusterConnectionStatus(String connectionId); - /** - * @param connectionId id - * @return the connection status history for each node connected to the cluster - */ - ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId); - - /** - * @param processGroupId id - * @return the process group status history for each node connected to the cluster - */ - ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId); /** * Returns a process group's status for each node connected to the cluster. @@ -1572,13 +1555,6 @@ public interface NiFiServiceFacade { */ ClusterProcessGroupStatusDTO getClusterProcessGroupStatus(String processorId); - /** - * Returns the remote process group status history for each node connected to the cluster. - * - * @param remoteProcessGroupId a remote process group identifier - * @return The cluster status history - */ - ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId); /** * Returns a remote process group's status for each node connected to the cluster. http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 945c671..098fd64 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -16,6 +16,25 @@ */ package org.apache.nifi.web; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.WebApplicationException; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; @@ -72,7 +91,6 @@ import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinQuery; import org.apache.nifi.reporting.BulletinRepository; -import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.user.AccountStatus; import org.apache.nifi.user.NiFiUser; import org.apache.nifi.user.NiFiUserGroup; @@ -131,7 +149,6 @@ import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.ControllerStatusDTO; import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.NodePortStatusDTO; @@ -160,24 +177,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.AccessDeniedException; -import javax.ws.rs.WebApplicationException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - /** * Implementation of NiFiServiceFacade that performs revision checking. */ @@ -2105,78 +2104,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { @Override public ProcessGroupStatusDTO getProcessGroupStatus(String groupId) { - ProcessGroupStatusDTO statusReport; - if (properties.isClusterManager()) { - final ProcessGroupStatus mergedProcessGroupStatus = clusterManager.getProcessGroupStatus(groupId); - if (mergedProcessGroupStatus == null) { - throw new ResourceNotFoundException(String.format("Unable to find status for process group %s.", groupId)); - } - statusReport = dtoFactory.createProcessGroupStatusDto(clusterManager.getBulletinRepository(), mergedProcessGroupStatus); - } else { - statusReport = controllerFacade.getProcessGroupStatus(groupId); - } - return statusReport; + return controllerFacade.getProcessGroupStatus(groupId); } @Override public ControllerStatusDTO getControllerStatus() { - final ControllerStatusDTO controllerStatus; - - if (properties.isClusterManager()) { - final Set<Node> connectedNodes = clusterManager.getNodes(Node.Status.CONNECTED); - - if (connectedNodes.isEmpty()) { - throw new NoConnectedNodesException(); - } - - int activeThreadCount = 0; - long totalFlowFileObjectCount = 0; - long totalFlowFileByteCount = 0; - for (final Node node : connectedNodes) { - final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload(); - if (nodeHeartbeatPayload == null) { - continue; - } - - activeThreadCount += nodeHeartbeatPayload.getActiveThreadCount(); - totalFlowFileObjectCount += nodeHeartbeatPayload.getTotalFlowFileCount(); - totalFlowFileByteCount += nodeHeartbeatPayload.getTotalFlowFileBytes(); - } - - controllerStatus = new ControllerStatusDTO(); - controllerStatus.setActiveThreadCount(activeThreadCount); - controllerStatus.setQueued(FormatUtils.formatCount(totalFlowFileObjectCount) + " / " + FormatUtils.formatDataSize(totalFlowFileByteCount)); - - final int numNodes = clusterManager.getNodeIds().size(); - controllerStatus.setConnectedNodes(connectedNodes.size() + " / " + numNodes); - - // get the bulletins for the controller - final BulletinRepository bulletinRepository = clusterManager.getBulletinRepository(); - controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController())); - - // get the controller service bulletins - final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build(); - controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery))); - - // get the reporting task bulletins - final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build(); - controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery))); - - // get the component counts by extracting them from the roots' group status - final ProcessGroupStatus status = clusterManager.getProcessGroupStatus("root"); - if (status != null) { - final ProcessGroupCounts counts = extractProcessGroupCounts(status); - controllerStatus.setRunningCount(counts.getRunningCount()); - controllerStatus.setStoppedCount(counts.getStoppedCount()); - controllerStatus.setInvalidCount(counts.getInvalidCount()); - controllerStatus.setDisabledCount(counts.getDisabledCount()); - controllerStatus.setActiveRemotePortCount(counts.getActiveRemotePortCount()); - controllerStatus.setInactiveRemotePortCount(counts.getInactiveRemotePortCount()); - } - } else { - // get the controller status - controllerStatus = controllerFacade.getControllerStatus(); - } + // get the controller status + final ControllerStatusDTO controllerStatus = controllerFacade.getControllerStatus(); // determine if there are any pending user accounts - only include if appropriate if (NiFiUserUtils.getAuthorities().contains(Authority.ROLE_ADMIN.toString())) { @@ -3371,25 +3305,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return clusterRemoteProcessGroupStatusDto; } - @Override - public ClusterStatusHistoryDTO getClusterProcessorStatusHistory(String processorId) { - return clusterManager.getProcessorStatusHistory(processorId); - } - - @Override - public ClusterStatusHistoryDTO getClusterConnectionStatusHistory(String connectionId) { - return clusterManager.getConnectionStatusHistory(connectionId); - } - - @Override - public ClusterStatusHistoryDTO getClusterProcessGroupStatusHistory(String processGroupId) { - return clusterManager.getProcessGroupStatusHistory(processGroupId); - } - - @Override - public ClusterStatusHistoryDTO getClusterRemoteProcessGroupStatusHistory(String remoteProcessGroupId) { - return clusterManager.getRemoteProcessGroupStatusHistory(remoteProcessGroupId); - } @Override public NodeStatusDTO getNodeStatus(String nodeId) { http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java index 857df56..3086ab4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ClusterResource.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.web.api; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; @@ -25,6 +27,7 @@ import javax.ws.rs.DefaultValue; import javax.ws.rs.FormParam; import javax.ws.rs.GET; import javax.ws.rs.HEAD; +import javax.ws.rs.HttpMethod; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -34,6 +37,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.cluster.node.Node; import org.apache.nifi.util.NiFiProperties; @@ -49,23 +53,22 @@ import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.search.NodeSearchResultDTO; import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO; +import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO; import org.apache.nifi.web.api.dto.status.ClusterStatusDTO; -import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; import org.apache.nifi.web.api.entity.ClusterConnectionStatusEntity; import org.apache.nifi.web.api.entity.ClusterEntity; import org.apache.nifi.web.api.entity.ClusterPortStatusEntity; +import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ClusterProcessorStatusEntity; import org.apache.nifi.web.api.entity.ClusterRemoteProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity; import org.apache.nifi.web.api.entity.ClusterStatusEntity; -import org.apache.nifi.web.api.entity.ClusterStatusHistoryEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.apache.nifi.web.api.entity.StatusHistoryEntity; import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.LongParameter; - -import org.apache.commons.lang3.StringUtils; import org.springframework.security.access.prepost.PreAuthorize; import com.sun.jersey.api.core.ResourceContext; @@ -75,8 +78,6 @@ import com.wordnik.swagger.annotations.ApiParam; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; -import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; -import org.apache.nifi.web.api.entity.ClusterProcessGroupStatusEntity; /** * RESTful endpoint for managing a cluster. @@ -92,6 +93,7 @@ public class ClusterResource extends ApplicationResource { private ResourceContext resourceContext; private NiFiServiceFacade serviceFacade; private NiFiProperties properties; + private WebClusterManager clusterManager; /** * Locates the ClusterConnection sub-resource. @@ -595,7 +597,7 @@ public class ClusterResource extends ApplicationResource { @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") @ApiOperation( value = "Gets processor status history across the cluster", - response = ClusterStatusHistoryEntity.class, + response = StatusHistoryEntity.class, authorizations = { @Authorization(value = "Read Only", type = "ROLE_MONITOR"), @Authorization(value = "DFM", type = "ROLE_DFM"), @@ -624,19 +626,16 @@ public class ClusterResource extends ApplicationResource { @PathParam("id") String id) { if (properties.isClusterManager()) { - final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessorStatusHistory(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity(); - entity.setClusterStatusHistory(dto); - entity.setRevision(revision); + final URI uri; + try { + final URI originalUri = getAbsolutePath(); + final String newPath = "/nifi-api/processors/" + id + "/status/history"; + uri = new URI(originalUri.getScheme(), originalUri.getAuthority(), newPath, originalUri.getQuery(), originalUri.getFragment()); + } catch (final URISyntaxException use) { + throw new RuntimeException(use); + } - // generate the response - return generateOkResponse(entity).build(); + return clusterManager.applyRequest(HttpMethod.GET, uri, getRequestParameters(true), getHeaders()).getResponse(); } throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); @@ -705,67 +704,6 @@ public class ClusterResource extends ApplicationResource { } /** - * Gets the connections status history for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterProcessorStatusHistoryEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/connections/{id}/status/history") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets connection status history across the cluster", - response = ClusterStatusHistoryEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getConnectionStatusHistory( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The connection id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - final ClusterStatusHistoryDTO dto = serviceFacade.getClusterConnectionStatusHistory(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity(); - entity.setClusterStatusHistory(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } - - /** * Gets the process group status for every node. * * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. @@ -827,66 +765,6 @@ public class ClusterResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); } - /** - * Gets the process group status history for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the process group - * @return A clusterProcessGroupStatusHistoryEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/process-groups/{id}/status/history") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets process group status history across the cluster", - response = ClusterStatusHistoryEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getProcessGroupStatusHistory( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The process group id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - final ClusterStatusHistoryDTO dto = serviceFacade.getClusterProcessGroupStatusHistory(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity(); - entity.setClusterStatusHistory(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } /** * Gets the remote process group status for every node. @@ -1074,66 +952,6 @@ public class ClusterResource extends ApplicationResource { throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); } - /** - * Gets the remote process group status history for every node. - * - * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. - * @param id The id of the processor - * @return A clusterRemoteProcessGroupStatusHistoryEntity - */ - @GET - @Consumes(MediaType.WILDCARD) - @Produces({MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON}) - @Path("/remote-process-groups/{id}/status/history") - @PreAuthorize("hasAnyRole('ROLE_MONITOR', 'ROLE_DFM', 'ROLE_ADMIN')") - @ApiOperation( - value = "Gets the remote process group status history across the cluster", - response = ClusterStatusHistoryEntity.class, - authorizations = { - @Authorization(value = "Read Only", type = "ROLE_MONITOR"), - @Authorization(value = "DFM", type = "ROLE_DFM"), - @Authorization(value = "Admin", type = "ROLE_ADMIN") - } - ) - @ApiResponses( - value = { - @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), - @ApiResponse(code = 401, message = "Client could not be authenticated."), - @ApiResponse(code = 403, message = "Client is not authorized to make this request."), - @ApiResponse(code = 404, message = "The specified resource could not be found."), - @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") - } - ) - public Response getRemoteProcessGroupStatusHistory( - @ApiParam( - value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", - required = false - ) - @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, - @ApiParam( - value = "The remote process group id.", - required = true - ) - @PathParam("id") String id) { - - if (properties.isClusterManager()) { - final ClusterStatusHistoryDTO dto = serviceFacade.getClusterRemoteProcessGroupStatusHistory(id); - - // create the revision - RevisionDTO revision = new RevisionDTO(); - revision.setClientId(clientId.getClientId()); - - // create entity - final ClusterStatusHistoryEntity entity = new ClusterStatusHistoryEntity(); - entity.setClusterStatusHistory(dto); - entity.setRevision(revision); - - // generate the response - return generateOkResponse(entity).build(); - } - - throw new IllegalClusterResourceRequestException("Only a cluster manager can process the request."); - } // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) { @@ -1144,4 +962,7 @@ public class ClusterResource extends ApplicationResource { this.properties = properties; } + public void setClusterManager(WebClusterManager clusterManager) { + this.clusterManager = clusterManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 738da04..ddc60cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -16,12 +16,41 @@ */ package org.apache.nifi.web.api; -import com.wordnik.swagger.annotations.Api; -import com.wordnik.swagger.annotations.ApiOperation; -import com.wordnik.swagger.annotations.ApiParam; -import com.wordnik.swagger.annotations.ApiResponse; -import com.wordnik.swagger.annotations.ApiResponses; -import com.wordnik.swagger.annotations.Authorization; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.FormParam; +import javax.ws.rs.GET; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextThreadLocal; @@ -33,7 +62,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.web.ConfigurationSnapshot; import org.apache.nifi.web.DownloadableContent; -import org.apache.nifi.web.IllegalClusterResourceRequestException; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.Revision; import org.apache.nifi.web.api.dto.ConnectableDTO; @@ -55,43 +83,14 @@ import org.apache.nifi.web.api.request.ClientIdParameter; import org.apache.nifi.web.api.request.ConnectableTypeParameter; import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.FormParam; -import javax.ws.rs.GET; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.StreamingOutput; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiParam; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; +import com.wordnik.swagger.annotations.Authorization; /** * RESTful endpoint for managing a Connection. @@ -99,8 +98,6 @@ import java.util.UUID; @Api(hidden = true) public class ConnectionResource extends ApplicationResource { - private static final Logger logger = LoggerFactory.getLogger(ConnectionResource.class); - private NiFiServiceFacade serviceFacade; private WebClusterManager clusterManager; private NiFiProperties properties; @@ -327,7 +324,7 @@ public class ConnectionResource extends ApplicationResource { // replicate if cluster manager if (properties.isClusterManager()) { - throw new IllegalClusterResourceRequestException("This request is only supported in standalone mode."); + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } // get the specified processor status history @@ -565,7 +562,7 @@ public class ConnectionResource extends ApplicationResource { headersToOverride.put("content-type", MediaType.APPLICATION_JSON); // replicate put request - return (Response) clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse(); + return clusterManager.applyRequest(HttpMethod.PUT, putUri, updateClientId(connectionEntity), getHeaders(headersToOverride)).getResponse(); } // get the connection http://git-wip-us.apache.org/repos/asf/nifi/blob/ad32cb82/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java index ef62a62..36b25f2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerResource.java @@ -524,6 +524,10 @@ public class ControllerResource extends ApplicationResource { ) @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId) { + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + final ControllerStatusDTO controllerStatus = serviceFacade.getControllerStatus(); // create the revision
