This is an automated email from the ASF dual-hosted git repository. aichrist pushed a commit to branch analytics-framework in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 6ea33126274fc921e53e59bf39d2db4ba21a7898 Author: Yolanda Davis <[email protected]> AuthorDate: Wed Jul 31 13:31:24 2019 -0400 NIFI-6150 Added tests for connection status analytics class, corrected variable names (cherry picked from commit 58c7c81) --- .../analytics/ConnectionStatusAnalytics.java | 6 +- .../analytics/TestConnectionStatusAnalytics.java | 159 +++++++++++++++++++++ 2 files changed, 162 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java index 8b7964e..6b831fa 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/ConnectionStatusAnalytics.java @@ -250,7 +250,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { private final ExtractFunction extract = (metric, statusHistory) -> { - List<Double> counts = new ArrayList<>(); + List<Double> values = new ArrayList<>(); List<Double> times = new ArrayList<>(); StatusHistoryDTO statusHistoryDTO = StatusHistoryUtil.createStatusHistoryDTO(statusHistory); @@ -258,10 +258,10 @@ public class ConnectionStatusAnalytics implements StatusAnalytics { for (StatusSnapshotDTO snap : statusHistoryDTO.getAggregateSnapshots()) { Long snapValue = snap.getStatusMetrics().get(metric); long snapTime = snap.getTimestamp().getTime(); - counts.add((double) snapValue); + values.add((double) snapValue); times.add((double) snapTime); } - return new Tuple<>(times.stream(), counts.stream()); + return new Tuple<>(times.stream(), values.stream()); }; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java new file mode 100644 index 0000000..e17ae9a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/analytics/TestConnectionStatusAnalytics.java @@ -0,0 +1,159 @@ +package org.apache.nifi.controller.status.analytics; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.MetricDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; +import org.apache.nifi.controller.status.history.StatusHistory; +import org.apache.nifi.controller.status.history.StatusSnapshot; +import org.apache.nifi.groups.ProcessGroup; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestConnectionStatusAnalytics { + + private static final Set<MetricDescriptor<?>> CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values()) + .map(ConnectionStatusDescriptor::getDescriptor) + .collect(Collectors.toSet()); + + protected ConnectionStatusAnalytics getConnectionStatusAnalytics(Long queuedBytes, Long queuedCount,String backPressureDataSizeThreshhold, Long backPressureObjectThreshold, Boolean isConstantStatus){ + ComponentStatusRepository statusRepository = Mockito.mock(ComponentStatusRepository.class); + FlowManager flowManager;flowManager = Mockito.mock(FlowManager.class); + final ProcessGroup processGroup = Mockito.mock(ProcessGroup.class); + final StatusHistory statusHistory = Mockito.mock(StatusHistory.class); + final Connection connection = Mockito.mock(Connection.class); + final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + final List<Connection> connections = new ArrayList<>(); + final String connectionIdentifier = "1"; + connections.add(connection); + + List<StatusSnapshot> snapshotList = new ArrayList<>(); + final long startTime = System.currentTimeMillis(); + int iterations = 10; + + for (int i=0; i < iterations; i++) { + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(CONNECTION_METRICS); + snapshot.setTimestamp(new Date(startTime + i * 1000)); + snapshot.addStatusMetric(ConnectionStatusDescriptor.QUEUED_BYTES.getDescriptor(), (isConstantStatus || i < 5) ? queuedBytes: queuedBytes * 2); + snapshot.addStatusMetric(ConnectionStatusDescriptor.QUEUED_COUNT.getDescriptor(), (isConstantStatus || i < 5) ? queuedCount: queuedCount * 2); + snapshotList.add(snapshot); + } + + when(flowFileQueue.getBackPressureDataSizeThreshold()).thenReturn(backPressureDataSizeThreshhold); + when(flowFileQueue.getBackPressureObjectThreshold()).thenReturn(backPressureObjectThreshold); + when(connection.getIdentifier()).thenReturn(connectionIdentifier); + when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); + when(processGroup.findAllConnections()).thenReturn(connections); + when(statusHistory.getStatusSnapshots()).thenReturn(snapshotList); + when(flowManager.getRootGroup()).thenReturn(processGroup); + when(statusRepository.getConnectionStatusHistory(anyString(),any(),any(),anyInt())).thenReturn(statusHistory); + ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository,flowManager,connectionIdentifier,false); + connectionStatusAnalytics.init(); + return connectionStatusAnalytics; + } + + @Test + public void testGetIntervalTimeMillis(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true); + Long interval = connectionStatusAnalytics.getIntervalTimeMillis(); + assertNotNull(interval); + assert(interval == 300000); + } + + @Test + public void testGetTimeToCountBackpressureMillisConstantStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true); + Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(); + assertNotNull(countTime); + assert(countTime == -1L); + } + + @Test + public void testGetTimeToCountBackpressureMillisVaryingStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false); + Long countTime = connectionStatusAnalytics.getTimeToCountBackpressureMillis(); + assertNotNull(countTime); + assert(countTime > -1L); + } + + @Test + public void testGetTimeToBytesBackpressureMillisConstantStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true); + Long bytesTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis(); + assertNotNull(bytesTime); + assert(bytesTime == -1L); + } + + @Test + public void testGetTimeToBytesBackpressureMillisVaryingStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false); + Long bytesTime = connectionStatusAnalytics.getTimeToBytesBackpressureMillis(); + assertNotNull(bytesTime); + assert(bytesTime > -1L); + } + + @Test + public void testGetNextIntervalBytesConstantStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true); + Long nextBytes = connectionStatusAnalytics.getNextIntervalBytes(); + assertNotNull(nextBytes); + assert(nextBytes == 5000L); + } + + @Test + public void testGetNextIntervalBytesVaryingStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, false); + Long nextBytes = connectionStatusAnalytics.getNextIntervalBytes(); + assertNotNull(nextBytes); + assert(nextBytes > -1L); + } + + @Test + public void testGetNextIntervalCountConstantStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true); + Long nextCount = connectionStatusAnalytics.getNextIntervalCount(); + assertNotNull(nextCount); + assert(nextCount == 50L); + } + + @Test + public void testGetNextIntervalCountVaryingStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true); + Long nextCount = connectionStatusAnalytics.getNextIntervalCount(); + assertNotNull(nextCount); + assert(nextCount == 50L); + } + + @Test + public void testGetNextIntervalPercentageUseBytesConstantStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(50000L, 50L, "1MB", 100L, true); + Long nextBytesPercentage = connectionStatusAnalytics.getNextIntervalPercentageUseBytes(); + assertNotNull(nextBytesPercentage); + assert(nextBytesPercentage == 5); + } + + @Test + public void testGetNextIntervalPercentageUseCountConstantStatus(){ + ConnectionStatusAnalytics connectionStatusAnalytics = getConnectionStatusAnalytics(5000L, 50L, "100MB", 100L, true); + Long nextCountPercentage = connectionStatusAnalytics.getNextIntervalPercentageUseCount(); + assertNotNull(nextCountPercentage); + assert(nextCountPercentage == 50); + } + +}
