http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java new file mode 100644 index 0000000..061120c --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/status/StatusRequestParser.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.status; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.diagnostics.GarbageCollection; +import org.apache.nifi.diagnostics.StorageUsage; +import org.apache.nifi.diagnostics.SystemDiagnostics; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.minifi.commons.status.common.BulletinStatus; +import org.apache.nifi.minifi.commons.status.common.ValidationError; +import org.apache.nifi.minifi.commons.status.connection.ConnectionHealth; +import org.apache.nifi.minifi.commons.status.connection.ConnectionStats; +import org.apache.nifi.minifi.commons.status.connection.ConnectionStatusBean; +import org.apache.nifi.minifi.commons.status.controllerservice.ControllerServiceHealth; +import org.apache.nifi.minifi.commons.status.controllerservice.ControllerServiceStatus; +import org.apache.nifi.minifi.commons.status.instance.InstanceHealth; +import org.apache.nifi.minifi.commons.status.instance.InstanceStats; +import org.apache.nifi.minifi.commons.status.instance.InstanceStatus; +import org.apache.nifi.minifi.commons.status.processor.ProcessorHealth; +import org.apache.nifi.minifi.commons.status.processor.ProcessorStats; +import org.apache.nifi.minifi.commons.status.processor.ProcessorStatusBean; +import org.apache.nifi.minifi.commons.status.reportingTask.ReportingTaskHealth; +import org.apache.nifi.minifi.commons.status.reportingTask.ReportingTaskStatus; +import org.apache.nifi.minifi.commons.status.rpg.InputPortStatus; +import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupHealth; +import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupStats; +import org.apache.nifi.minifi.commons.status.rpg.RemoteProcessGroupStatusBean; +import org.apache.nifi.minifi.commons.status.system.ContentRepositoryUsage; +import org.apache.nifi.minifi.commons.status.system.FlowfileRepositoryUsage; +import org.apache.nifi.minifi.commons.status.system.GarbageCollectionStatus; +import org.apache.nifi.minifi.commons.status.system.HeapStatus; +import org.apache.nifi.minifi.commons.status.system.SystemDiagnosticsStatus; +import org.apache.nifi.minifi.commons.status.system.SystemProcessorStats; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinQuery; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public final class StatusRequestParser { + private StatusRequestParser() { + } + + static ProcessorStatusBean parseProcessorStatusRequest(ProcessorStatus inputProcessorStatus, String statusTypes, FlowController flowController, Collection<ValidationResult> validationResults) { + ProcessorStatusBean processorStatusBean = new ProcessorStatusBean(); + processorStatusBean.setName(inputProcessorStatus.getName()); + + String[] statusSplits = statusTypes.split(","); + List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins( + new BulletinQuery.Builder() + .sourceIdMatches(inputProcessorStatus.getId()) + .build()); + + for (String statusType : statusSplits) { + switch (statusType.toLowerCase().trim()) { + case "health": + ProcessorHealth processorHealth = new ProcessorHealth(); + + processorHealth.setRunStatus(inputProcessorStatus.getRunStatus().name()); + processorHealth.setHasBulletins(!bulletinList.isEmpty()); + processorHealth.setValidationErrorList(transformValidationResults(validationResults)); + + processorStatusBean.setProcessorHealth(processorHealth); + break; + case "bulletins": + processorStatusBean.setBulletinList(transformBulletins(bulletinList)); + break; + case "stats": + ProcessorStats processorStats = new ProcessorStats(); + + processorStats.setActiveThreads(inputProcessorStatus.getActiveThreadCount()); + processorStats.setFlowfilesReceived(inputProcessorStatus.getFlowFilesReceived()); + processorStats.setBytesRead(inputProcessorStatus.getBytesRead()); + processorStats.setBytesWritten(inputProcessorStatus.getBytesWritten()); + processorStats.setFlowfilesSent(inputProcessorStatus.getFlowFilesSent()); + processorStats.setInvocations(inputProcessorStatus.getInvocations()); + processorStats.setProcessingNanos(inputProcessorStatus.getProcessingNanos()); + + processorStatusBean.setProcessorStats(processorStats); + break; + } + } + return processorStatusBean; + } + + static RemoteProcessGroupStatusBean parseRemoteProcessGroupStatusRequest(RemoteProcessGroupStatus inputRemoteProcessGroupStatus, String statusTypes, FlowController flowController) { + RemoteProcessGroupStatusBean remoteProcessGroupStatusBean = new RemoteProcessGroupStatusBean(); + remoteProcessGroupStatusBean.setName(inputRemoteProcessGroupStatus.getName()); + + String rootGroupId = flowController.getRootGroupId(); + String[] statusSplits = statusTypes.split(","); + + List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins( + new BulletinQuery.Builder() + .sourceIdMatches(inputRemoteProcessGroupStatus.getId()) + .build()); + List<String> authorizationIssues = inputRemoteProcessGroupStatus.getAuthorizationIssues(); + + for (String statusType : statusSplits) { + switch (statusType.toLowerCase().trim()) { + case "health": + RemoteProcessGroupHealth remoteProcessGroupHealth = new RemoteProcessGroupHealth(); + + remoteProcessGroupHealth.setTransmissionStatus(inputRemoteProcessGroupStatus.getTransmissionStatus().name()); + remoteProcessGroupHealth.setHasAuthorizationIssues(!authorizationIssues.isEmpty()); + remoteProcessGroupHealth.setActivePortCount(inputRemoteProcessGroupStatus.getActiveRemotePortCount()); + remoteProcessGroupHealth.setInactivePortCount(inputRemoteProcessGroupStatus.getInactiveRemotePortCount()); + remoteProcessGroupHealth.setHasBulletins(!bulletinList.isEmpty()); + + remoteProcessGroupStatusBean.setRemoteProcessGroupHealth(remoteProcessGroupHealth); + break; + case "bulletins": + remoteProcessGroupStatusBean.setBulletinList(transformBulletins(bulletinList)); + break; + case "authorizationissues": + remoteProcessGroupStatusBean.setAuthorizationIssues(authorizationIssues); + break; + case "inputports": + List<InputPortStatus> inputPortStatusList = new LinkedList<>(); + RemoteProcessGroup remoteProcessGroup = flowController.getGroup(rootGroupId).getRemoteProcessGroup(inputRemoteProcessGroupStatus.getId()); + Set<RemoteGroupPort> inputPorts = remoteProcessGroup.getInputPorts(); + + for (RemoteGroupPort inputPort : inputPorts) { + InputPortStatus inputPortStatus = new InputPortStatus(); + + inputPortStatus.setName(inputPort.getName()); + inputPortStatus.setTargetExists(inputPort.getTargetExists()); + inputPortStatus.setTargetRunning(inputPort.isTargetRunning()); + + inputPortStatusList.add(inputPortStatus); + } + remoteProcessGroupStatusBean.setInputPortStatusList(inputPortStatusList); + break; + case "stats": + RemoteProcessGroupStats remoteProcessGroupStats = new RemoteProcessGroupStats(); + + remoteProcessGroupStats.setActiveThreads(inputRemoteProcessGroupStatus.getActiveThreadCount()); + remoteProcessGroupStats.setSentContentSize(inputRemoteProcessGroupStatus.getSentContentSize()); + remoteProcessGroupStats.setSentCount(inputRemoteProcessGroupStatus.getSentCount()); + + remoteProcessGroupStatusBean.setRemoteProcessGroupStats(remoteProcessGroupStats); + break; + } + } + return remoteProcessGroupStatusBean; + } + + static ConnectionStatusBean parseConnectionStatusRequest(ConnectionStatus inputConnectionStatus, String statusTypes, Logger logger) { + ConnectionStatusBean connectionStatusBean = new ConnectionStatusBean(); + connectionStatusBean.setName(inputConnectionStatus.getName()); + + String[] statusSplits = statusTypes.split(","); + for (String statusType : statusSplits) { + switch (statusType.toLowerCase().trim()) { + case "health": + ConnectionHealth connectionHealth = new ConnectionHealth(); + + connectionHealth.setQueuedBytes(inputConnectionStatus.getQueuedBytes()); + connectionHealth.setQueuedCount(inputConnectionStatus.getQueuedCount()); + + connectionStatusBean.setConnectionHealth(connectionHealth); + break; + case "stats": + ConnectionStats connectionStats = new ConnectionStats(); + + connectionStats.setInputBytes(inputConnectionStatus.getInputBytes()); + connectionStats.setInputCount(inputConnectionStatus.getInputCount()); + connectionStats.setOutputCount(inputConnectionStatus.getOutputCount()); + connectionStats.setOutputBytes(inputConnectionStatus.getOutputBytes()); + + connectionStatusBean.setConnectionStats(connectionStats); + break; + } + } + return connectionStatusBean; + } + + static ReportingTaskStatus parseReportingTaskStatusRequest(String id, ReportingTaskNode reportingTaskNode, String statusTypes, FlowController flowController, Logger logger) { + ReportingTaskStatus reportingTaskStatus = new ReportingTaskStatus(); + reportingTaskStatus.setName(id); + + String[] statusSplits = statusTypes.split(","); + List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins( + new BulletinQuery.Builder() + .sourceIdMatches(id) + .build()); + for (String statusType : statusSplits) { + switch (statusType.toLowerCase().trim()) { + case "health": + ReportingTaskHealth reportingTaskHealth = new ReportingTaskHealth(); + + reportingTaskHealth.setScheduledState(reportingTaskNode.getScheduledState().name()); + reportingTaskHealth.setActiveThreads(reportingTaskNode.getActiveThreadCount()); + reportingTaskHealth.setHasBulletins(!bulletinList.isEmpty()); + + Collection<ValidationResult> validationResults = reportingTaskNode.getValidationErrors(); + reportingTaskHealth.setValidationErrorList(transformValidationResults(validationResults)); + + reportingTaskStatus.setReportingTaskHealth(reportingTaskHealth); + break; + case "bulletins": + reportingTaskStatus.setBulletinList(transformBulletins(bulletinList)); + break; + } + } + return reportingTaskStatus; + } + + static ControllerServiceStatus parseControllerServiceStatusRequest(ControllerServiceNode controllerServiceNode, String statusTypes, FlowController flowController, Logger logger) { + ControllerServiceStatus controllerServiceStatus = new ControllerServiceStatus(); + String id = controllerServiceNode.getIdentifier(); + controllerServiceStatus.setName(id); + + String[] statusSplits = statusTypes.split(","); + List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletins( + new BulletinQuery.Builder() + .sourceIdMatches(id) + .build()); + for (String statusType : statusSplits) { + switch (statusType.toLowerCase().trim()) { + case "health": + ControllerServiceHealth controllerServiceHealth = new ControllerServiceHealth(); + + controllerServiceHealth.setState(controllerServiceNode.getState().name()); + controllerServiceHealth.setHasBulletins(!bulletinList.isEmpty()); + + Collection<ValidationResult> validationResults = controllerServiceNode.getValidationErrors(); + controllerServiceHealth.setValidationErrorList(transformValidationResults(validationResults)); + + controllerServiceStatus.setControllerServiceHealth(controllerServiceHealth); + break; + case "bulletins": + controllerServiceStatus.setBulletinList(transformBulletins(bulletinList)); + break; + } + } + return controllerServiceStatus; + } + + static SystemDiagnosticsStatus parseSystemDiagnosticsRequest(SystemDiagnostics inputSystemDiagnostics, String statusTypes) throws StatusRequestException { + if (inputSystemDiagnostics == null) { + throw new StatusRequestException("Unable to get system diagnostics"); + } + + SystemDiagnosticsStatus systemDiagnosticsStatus = new SystemDiagnosticsStatus(); + String[] statusSplits = statusTypes.split(","); + + for (String statusType : statusSplits) { + switch (statusType.toLowerCase().trim()) { + case "heap": + HeapStatus heapStatus = new HeapStatus(); + heapStatus.setTotalHeap(inputSystemDiagnostics.getTotalHeap()); + heapStatus.setMaxHeap(inputSystemDiagnostics.getMaxHeap()); + heapStatus.setFreeHeap(inputSystemDiagnostics.getFreeHeap()); + heapStatus.setUsedHeap(inputSystemDiagnostics.getUsedHeap()); + heapStatus.setHeapUtilization(inputSystemDiagnostics.getHeapUtilization()); + heapStatus.setTotalNonHeap(inputSystemDiagnostics.getTotalNonHeap()); + heapStatus.setMaxNonHeap(inputSystemDiagnostics.getMaxNonHeap()); + heapStatus.setFreeNonHeap(inputSystemDiagnostics.getFreeNonHeap()); + heapStatus.setUsedNonHeap(inputSystemDiagnostics.getUsedNonHeap()); + heapStatus.setNonHeapUtilization(inputSystemDiagnostics.getNonHeapUtilization()); + systemDiagnosticsStatus.setHeapStatus(heapStatus); + break; + case "processorstats": + SystemProcessorStats systemProcessorStats = new SystemProcessorStats(); + systemProcessorStats.setAvailableProcessors(inputSystemDiagnostics.getAvailableProcessors()); + systemProcessorStats.setLoadAverage(inputSystemDiagnostics.getProcessorLoadAverage()); + systemDiagnosticsStatus.setProcessorStatus(systemProcessorStats); + break; + case "contentrepositoryusage": + List<ContentRepositoryUsage> contentRepositoryUsageList = new LinkedList<>(); + Map<String, StorageUsage> contentRepoStorage = inputSystemDiagnostics.getContentRepositoryStorageUsage(); + + for (Map.Entry<String, StorageUsage> stringStorageUsageEntry : contentRepoStorage.entrySet()) { + ContentRepositoryUsage contentRepositoryUsage = new ContentRepositoryUsage(); + StorageUsage storageUsage = stringStorageUsageEntry.getValue(); + + contentRepositoryUsage.setName(storageUsage.getIdentifier()); + contentRepositoryUsage.setFreeSpace(storageUsage.getFreeSpace()); + contentRepositoryUsage.setTotalSpace(storageUsage.getTotalSpace()); + contentRepositoryUsage.setDiskUtilization(storageUsage.getDiskUtilization()); + contentRepositoryUsage.setUsedSpace(storageUsage.getUsedSpace()); + + contentRepositoryUsageList.add(contentRepositoryUsage); + } + systemDiagnosticsStatus.setContentRepositoryUsageList(contentRepositoryUsageList); + break; + case "flowfilerepositoryusage": + FlowfileRepositoryUsage flowfileRepositoryUsage = new FlowfileRepositoryUsage(); + StorageUsage flowFileRepoStorage = inputSystemDiagnostics.getFlowFileRepositoryStorageUsage(); + + flowfileRepositoryUsage.setFreeSpace(flowFileRepoStorage.getFreeSpace()); + flowfileRepositoryUsage.setTotalSpace(flowFileRepoStorage.getTotalSpace()); + flowfileRepositoryUsage.setDiskUtilization(flowFileRepoStorage.getDiskUtilization()); + flowfileRepositoryUsage.setUsedSpace(flowFileRepoStorage.getUsedSpace()); + + systemDiagnosticsStatus.setFlowfileRepositoryUsage(flowfileRepositoryUsage); + break; + case "garbagecollection": + List<GarbageCollectionStatus> garbageCollectionStatusList = new LinkedList<>(); + Map<String, GarbageCollection> garbageCollectionMap = inputSystemDiagnostics.getGarbageCollection(); + + for (Map.Entry<String, GarbageCollection> stringGarbageCollectionEntry : garbageCollectionMap.entrySet()) { + GarbageCollectionStatus garbageCollectionStatus = new GarbageCollectionStatus(); + GarbageCollection garbageCollection = stringGarbageCollectionEntry.getValue(); + + garbageCollectionStatus.setName(garbageCollection.getName()); + garbageCollectionStatus.setCollectionCount(garbageCollection.getCollectionCount()); + garbageCollectionStatus.setCollectionTime(garbageCollection.getCollectionTime()); + + garbageCollectionStatusList.add(garbageCollectionStatus); + } + systemDiagnosticsStatus.setGarbageCollectionStatusList(garbageCollectionStatusList); + break; + } + } + return systemDiagnosticsStatus; + } + + static InstanceStatus parseInstanceRequest(String statusTypes, FlowController flowController, ProcessGroupStatus rootGroupStatus) { + InstanceStatus instanceStatus = new InstanceStatus(); + + flowController.getAllControllerServices(); + List<Bulletin> bulletinList = flowController.getBulletinRepository().findBulletinsForController(); + String[] statusSplits = statusTypes.split(","); + + for (String statusType : statusSplits) { + switch (statusType.toLowerCase().trim()) { + case "health": + InstanceHealth instanceHealth = new InstanceHealth(); + + instanceHealth.setQueuedCount(rootGroupStatus.getQueuedCount()); + instanceHealth.setQueuedContentSize(rootGroupStatus.getQueuedContentSize()); + instanceHealth.setHasBulletins(!bulletinList.isEmpty()); + instanceHealth.setActiveThreads(rootGroupStatus.getActiveThreadCount()); + + instanceStatus.setInstanceHealth(instanceHealth); + break; + case "bulletins": + instanceStatus.setBulletinList(transformBulletins(flowController.getBulletinRepository().findBulletinsForController())); + break; + case "stats": + InstanceStats instanceStats = new InstanceStats(); + + instanceStats.setBytesRead(rootGroupStatus.getBytesRead()); + instanceStats.setBytesWritten(rootGroupStatus.getBytesWritten()); + instanceStats.setBytesSent(rootGroupStatus.getBytesSent()); + instanceStats.setFlowfilesSent(rootGroupStatus.getFlowFilesSent()); + instanceStats.setBytesTransferred(rootGroupStatus.getBytesTransferred()); + instanceStats.setFlowfilesTransferred(rootGroupStatus.getFlowFilesTransferred()); + instanceStats.setBytesReceived(rootGroupStatus.getBytesReceived()); + instanceStats.setFlowfilesReceived(rootGroupStatus.getFlowFilesReceived()); + + instanceStatus.setInstanceStats(instanceStats); + break; + } + } + return instanceStatus; + } + + private static List<ValidationError> transformValidationResults(Collection<ValidationResult> validationResults) { + List<ValidationError> validationErrorList = new LinkedList<>(); + for (ValidationResult validationResult : validationResults) { + if (!validationResult.isValid()) { + ValidationError validationError = new ValidationError(); + validationError.setSubject(validationResult.getSubject()); + validationError.setInput(validationResult.getInput()); + validationError.setReason(validationResult.getExplanation()); + + validationErrorList.add(validationError); + } + } + return validationErrorList; + } + + private static List<BulletinStatus> transformBulletins(List<Bulletin> bulletinList) { + List<BulletinStatus> bulletinStatusList = new LinkedList<>(); + if (!bulletinList.isEmpty()) { + for (Bulletin bulletin : bulletinList) { + BulletinStatus bulletinStatus = new BulletinStatus(); + bulletinStatus.setMessage(bulletin.getMessage()); + bulletinStatus.setTimestamp(bulletin.getTimestamp()); + bulletinStatusList.add(bulletinStatus); + } + } + return bulletinStatusList; + } +}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java new file mode 100644 index 0000000..027d685 --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.status; + +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfiguredComponent; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.RunStatus; +import org.apache.nifi.controller.status.TransmissionStatus; +import org.apache.nifi.diagnostics.GarbageCollection; +import org.apache.nifi.diagnostics.StorageUsage; +import org.apache.nifi.diagnostics.SystemDiagnostics; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.minifi.commons.status.FlowStatusReport; +import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.reporting.Bulletin; +import org.apache.nifi.reporting.BulletinQuery; +import org.apache.nifi.reporting.BulletinRepository; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addConnectionStatus; +import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addControllerServiceStatus; +import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addExpectedRemoteProcessGroupStatus; +import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addInstanceStatus; +import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addProcessorStatus; +import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addReportingTaskStatus; +import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addSystemDiagnosticStatus; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestStatusConfigReporter { + private FlowController mockFlowController; + private ProcessGroupStatus rootGroupStatus; + private BulletinRepository bulletinRepo; + private ProcessGroup processGroup; + + @Before + public void setup() { + mockFlowController = mock(FlowController.class); + rootGroupStatus = mock(ProcessGroupStatus.class); + bulletinRepo = mock(BulletinRepository.class); + processGroup = mock(ProcessGroup.class); + + when(mockFlowController.getRootGroupId()).thenReturn("root"); + when(mockFlowController.getGroupStatus("root")).thenReturn(rootGroupStatus); + when(mockFlowController.getControllerStatus()).thenReturn(rootGroupStatus); + when(mockFlowController.getBulletinRepository()).thenReturn(bulletinRepo); + when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup); + } + + @Test + public void processorStatusHealth() throws Exception { + populateProcessor(false, false); + + String statusRequest = "processor:all:health"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addProcessorStatus(expected, true, false, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void processorStatusWithValidationErrors() throws Exception { + populateProcessor(true, false); + + String statusRequest = "processor:all:health"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addProcessorStatus(expected, true, true, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void processorStatusAll() throws Exception { + populateProcessor(true, true); + + String statusRequest = "processor:all:health, stats, bulletins"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addProcessorStatus(expected, true, true, true, true, true); + + assertEquals(expected, actual); + } + + @Test + public void connectionStatusHealth() throws Exception { + populateConnection(); + + String statusRequest = "connection:all:health"; + FlowStatusReport status = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addConnectionStatus(expected, true, false); + + assertEquals(expected, status); + } + + + @Test + public void connectionStatusAll() throws Exception { + populateConnection(); + + String statusRequest = "connection:all:health, stats"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + addConnectionStatus(expected, true, true); + + assertEquals(expected, actual); + } + + @Test + public void connectionAndProcessorStatusHealth() throws Exception { + + populateConnection(); + + populateProcessor(false, false); + + String statusRequest = "connection:connectionId:health; processor:UpdateAttributeProcessorId:health"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + addConnectionStatus(expected, true, false); + + addProcessorStatus(expected, true, false, false, false, false); + + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + assertEquals(expected, actual); + } + + @Test + public void provenanceReportingTaskStatusHealth() throws Exception { + populateReportingTask(false, false); + + String statusRequest = "provenanceReporting:health"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + addReportingTaskStatus(expected, true, false, false, false); + + assertEquals(expected, actual); + } + + + @Test + public void provenanceReportingTaskStatusBulletins() throws Exception { + populateReportingTask(true, false); + + String statusRequest = "provenanceReporting:bulletins"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addReportingTaskStatus(expected, false, false, true, true); + + assertEquals(expected, actual); + } + + @Test + public void provenanceReportingTaskStatusAll() throws Exception { + populateReportingTask(true, true); + + String statusRequest = "provenanceReporting:health,bulletins"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addReportingTaskStatus(expected, true, true, true, true); + + assertEquals(expected, actual); + } + + @Test + public void systemDiagnosticHeap() throws Exception { + populateSystemDiagnostics(); + + String statusRequest = "systemDiagnostics:heap"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addSystemDiagnosticStatus(expected, true, false, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void systemDiagnosticProcessorStats() throws Exception { + populateSystemDiagnostics(); + + String statusRequest = "systemDiagnostics:processorStats"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addSystemDiagnosticStatus(expected, false, true, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void systemDiagnosticFlowFileRepo() throws Exception { + populateSystemDiagnostics(); + + String statusRequest = "systemDiagnostics:flowfilerepositoryusage"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addSystemDiagnosticStatus(expected, false, false, true, false, false); + + assertEquals(expected, actual); + } + + @Test + public void systemDiagnosticContentRepo() throws Exception { + populateSystemDiagnostics(); + + String statusRequest = "systemDiagnostics:contentrepositoryusage"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addSystemDiagnosticStatus(expected, false, false, false, true, false); + + assertEquals(expected, actual); + } + + @Test + public void systemDiagnosticGarbageCollection() throws Exception { + populateSystemDiagnostics(); + + String statusRequest = "systemDiagnostics:garbagecollection"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addSystemDiagnosticStatus(expected, false, false, false, false, true); + + assertEquals(expected, actual); + } + + + @Test + public void systemDiagnosticAll() throws Exception { + populateSystemDiagnostics(); + + String statusRequest = "systemDiagnostics:garbagecollection, heap, processorstats, contentrepositoryusage, flowfilerepositoryusage"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addSystemDiagnosticStatus(expected, true, true, true, true, true); + + assertEquals(expected, actual); + } + + @Test + public void instanceStatusHealth() throws Exception { + populateInstance(false); + + String statusRequest = "instance:health"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + addInstanceStatus(expected, true, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void instanceStatusBulletins() throws Exception { + populateInstance(true); + + String statusRequest = "instance:bulletins"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addInstanceStatus(expected, false, false, true, true); + + assertEquals(expected, actual); + } + + @Test + public void instanceStatusStats() throws Exception { + populateInstance(false); + + String statusRequest = "instance:stats"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addInstanceStatus(expected, false, true, false, false); + + assertEquals(expected, actual); + } + + @Test + public void instanceStatusAll() throws Exception { + populateInstance(true); + + String statusRequest = "instance:stats, bulletins, health"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addInstanceStatus(expected, true, true, true, true); + + assertEquals(expected, actual); + } + + @Test + public void controllerServiceStatusHealth() throws Exception { + populateControllerService(false, false); + + String statusRequest = "controllerServices:health"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addControllerServiceStatus(expected, true, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void controllerServiceStatusBulletins() throws Exception { + populateControllerService(false, true); + + String statusRequest = "controllerServices:bulletins"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addControllerServiceStatus(expected, false, false, true, true); + + assertEquals(expected, actual); + } + + @Test + public void controllerServiceStatusAll() throws Exception { + populateControllerService(true, true); + + String statusRequest = "controllerServices:bulletins, health"; + + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addControllerServiceStatus(expected, true, true, true, true); + + assertEquals(expected, actual); + } + + @Test + public void remoteProcessGroupStatusHealth() throws Exception { + populateRemoteProcessGroup(false, false); + + String statusRequest = "remoteProcessGroup:all:health"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addExpectedRemoteProcessGroupStatus(expected, true, false, false, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void remoteProcessGroupStatusBulletins() throws Exception { + populateRemoteProcessGroup(true, false); + + String statusRequest = "remoteProcessGroup:all:bulletins"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addExpectedRemoteProcessGroupStatus(expected, false, false, false, false, true, true); + + assertEquals(expected, actual); + } + + @Test + public void remoteProcessGroupStatusAuthorizationIssues() throws Exception { + populateRemoteProcessGroup(false, true); + + String statusRequest = "remoteProcessGroup:all:authorizationissues"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addExpectedRemoteProcessGroupStatus(expected, false, true, false, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void remoteProcessGroupStatusInputPorts() throws Exception { + populateRemoteProcessGroup(false, false); + + String statusRequest = "remoteProcessGroup:all:inputPorts"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addExpectedRemoteProcessGroupStatus(expected, false, false, true, false, false, false); + + assertEquals(expected, actual); + } + + @Test + public void remoteProcessGroupStatusStats() throws Exception { + populateRemoteProcessGroup(false, false); + + String statusRequest = "remoteProcessGroup:all:stats"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addExpectedRemoteProcessGroupStatus(expected, false, false, false, true, false, false); + + assertEquals(expected, actual); + } + + + @Test + public void remoteProcessGroupStatusAll() throws Exception { + populateRemoteProcessGroup(true, true); + + String statusRequest = "remoteProcessGroup:all:health, authorizationissues, bulletins, inputPorts, stats"; + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, true); + + assertEquals(expected, actual); + } + + @Test + public void statusEverything() throws Exception { + when(bulletinRepo.findBulletins(anyObject())).thenReturn(Collections.emptyList()); + + populateControllerService(true, false); + populateInstance(true); + populateSystemDiagnostics(); + populateReportingTask(false, true); + populateConnection(); + populateProcessor(true, false); + populateRemoteProcessGroup(false, true); + + String statusRequest = "controllerServices:bulletins,health; processor:all:health,stats,bulletins; instance:bulletins,health,stats ; systemDiagnostics:garbagecollection, heap, " + + "processorstats, contentrepositoryusage, flowfilerepositoryusage; connection:all:health,stats; provenanceReporting:health,bulletins; remoteProcessGroup:all:health, " + + "authorizationissues, bulletins, inputPorts, stats"; + + FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class)); + + FlowStatusReport expected = new FlowStatusReport(); + expected.setErrorsGeneratingReport(Collections.EMPTY_LIST); + + addControllerServiceStatus(expected, true, true, true, false); + addInstanceStatus(expected, true, true, true, true); + addSystemDiagnosticStatus(expected, true, true, true, true, true); + addReportingTaskStatus(expected, true, true, true, false); + addConnectionStatus(expected, true, true); + addProcessorStatus(expected, true, true, true, true, false); + addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, false); + + assertEquals(expected, actual); + } + + + /*************************** + * Populator methods + *************************/ + + private void addBulletinsToInstance() { + Bulletin bulletin = mock(Bulletin.class); + when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L)); + when(bulletin.getMessage()).thenReturn("Bulletin message"); + + List<Bulletin> bulletinList = new ArrayList<>(); + bulletinList.add(bulletin); + + when(bulletinRepo.findBulletinsForController()).thenReturn(bulletinList); + } + + private void populateSystemDiagnostics() { + SystemDiagnostics systemDiagnostics = new SystemDiagnostics(); + addGarbageCollectionToSystemDiagnostics(systemDiagnostics); + addHeapSystemDiagnostics(systemDiagnostics); + addContentRepoToSystemDiagnostics(systemDiagnostics); + addFlowFileRepoToSystemDiagnostics(systemDiagnostics); + addProcessorInfoToSystemDiagnostics(systemDiagnostics); + when(mockFlowController.getSystemDiagnostics()).thenReturn(systemDiagnostics); + } + + private void populateControllerService(boolean validationErrors, boolean addBulletins) { + ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class); + addControllerServiceHealth(controllerServiceNode); + if (validationErrors) { + addValidationErrors(controllerServiceNode); + } + + if (addBulletins) { + addBulletins("Bulletin message", controllerServiceNode.getIdentifier()); + } + HashSet<ControllerServiceNode> controllerServiceNodes = new HashSet<>(); + controllerServiceNodes.add(controllerServiceNode); + when(mockFlowController.getAllControllerServices()).thenReturn(controllerServiceNodes); + } + + private void populateInstance(boolean addBulletins) { + setRootGroupStatusVariables(); + if (addBulletins) { + addBulletinsToInstance(); + } + } + + private void populateReportingTask(boolean addBulletins, boolean validationErrors) { + if (addBulletins) { + addBulletins("Bulletin message", "ReportProvenance"); + } + + ReportingTaskNode reportingTaskNode = mock(ReportingTaskNode.class); + addReportingTaskNodeVariables(reportingTaskNode); + + HashSet<ReportingTaskNode> reportingTaskNodes = new HashSet<>(); + reportingTaskNodes.add(reportingTaskNode); + + when(mockFlowController.getAllReportingTasks()).thenReturn(reportingTaskNodes); + + if (validationErrors) { + ValidationResult validationResult = new ValidationResult.Builder() + .input("input") + .subject("subject") + .explanation("is not valid") + .build(); + + ValidationResult validationResult2 = new ValidationResult.Builder() + .input("input2") + .subject("subject2") + .explanation("is not valid too") + .build(); + + List<ValidationResult> validationResultList = new ArrayList<>(); + validationResultList.add(validationResult); + validationResultList.add(validationResult2); + + when(reportingTaskNode.getValidationErrors()).thenReturn(validationResultList); + } else { + when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST); + } + } + + private void populateConnection() { + ConnectionStatus connectionStatus = new ConnectionStatus(); + connectionStatus.setQueuedBytes(100); + connectionStatus.setId("connectionId"); + connectionStatus.setName("connectionId"); + connectionStatus.setQueuedCount(10); + connectionStatus.setInputCount(1); + connectionStatus.setInputBytes(2); + connectionStatus.setOutputCount(3); + connectionStatus.setOutputBytes(4); + + Collection<ConnectionStatus> statusCollection = new ArrayList<>(); + statusCollection.add(connectionStatus); + + when(rootGroupStatus.getConnectionStatus()).thenReturn(statusCollection); + } + + private void populateProcessor(boolean validationErrors, boolean addBulletins) { + if (addBulletins) { + addBulletins("Bulletin message", "UpdateAttributeProcessorId"); + } + + ProcessorStatus processorStatus = new ProcessorStatus(); + processorStatus.setType("org.apache.nifi.processors.attributes.UpdateAttribute"); + processorStatus.setId("UpdateAttributeProcessorId"); + processorStatus.setName("UpdateAttributeProcessorId"); + processorStatus.setRunStatus(RunStatus.Stopped); + processorStatus.setActiveThreadCount(1); + processorStatus.setFlowFilesReceived(2); + processorStatus.setBytesRead(3); + processorStatus.setBytesWritten(4); + processorStatus.setFlowFilesSent(5); + processorStatus.setInvocations(6); + processorStatus.setProcessingNanos(7); + + Collection<ProcessorStatus> statusCollection = new ArrayList<>(); + statusCollection.add(processorStatus); + + mockProcessorEmptyValidation(processorStatus.getId(), processGroup); + when(rootGroupStatus.getProcessorStatus()).thenReturn(statusCollection); + + ProcessorNode processorNode = mock(ProcessorNode.class); + when(processGroup.getProcessor(processorStatus.getId())).thenReturn(processorNode); + + if (validationErrors) { + ValidationResult validationResult = new ValidationResult.Builder() + .input("input") + .subject("subject") + .explanation("is not valid") + .build(); + + ValidationResult validationResult2 = new ValidationResult.Builder() + .input("input2") + .subject("subject2") + .explanation("is not valid too") + .build(); + + List<ValidationResult> validationResultList = new ArrayList<>(); + validationResultList.add(validationResult); + validationResultList.add(validationResult2); + + when(processorNode.getValidationErrors()).thenReturn(validationResultList); + } else { + when(processorNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST); + } + } + + private void populateRemoteProcessGroup(boolean addBulletins, boolean addAuthIssues) { + when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup); + + RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class); + when(processGroup.getRemoteProcessGroup(any())).thenReturn(remoteProcessGroup); + + RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class); + when(remoteGroupPort.getName()).thenReturn("inputPort"); + when(remoteGroupPort.getTargetExists()).thenReturn(true); + when(remoteGroupPort.isTargetRunning()).thenReturn(false); + + when(remoteProcessGroup.getInputPorts()).thenReturn(Collections.singleton(remoteGroupPort)); + + RemoteProcessGroupStatus remoteProcessGroupStatus = new RemoteProcessGroupStatus(); + addRemoteProcessGroupStatus(remoteProcessGroupStatus); + if (addAuthIssues) { + remoteProcessGroupStatus.setAuthorizationIssues(Collections.singletonList("auth issue")); + } else { + remoteProcessGroupStatus.setAuthorizationIssues(Collections.EMPTY_LIST); + } + if (addBulletins) { + addBulletins("Bulletin message", remoteProcessGroupStatus.getId()); + } + when(rootGroupStatus.getRemoteProcessGroupStatus()).thenReturn(Collections.singletonList(remoteProcessGroupStatus)); + } + + + private void setRootGroupStatusVariables() { + when(rootGroupStatus.getQueuedContentSize()).thenReturn(1L); + when(rootGroupStatus.getQueuedCount()).thenReturn(2); + when(rootGroupStatus.getActiveThreadCount()).thenReturn(3); + when(rootGroupStatus.getBytesRead()).thenReturn(1L); + when(rootGroupStatus.getBytesWritten()).thenReturn(2L); + when(rootGroupStatus.getBytesSent()).thenReturn(3L); + when(rootGroupStatus.getFlowFilesSent()).thenReturn(4); + when(rootGroupStatus.getBytesTransferred()).thenReturn(5L); + when(rootGroupStatus.getFlowFilesTransferred()).thenReturn(6); + when(rootGroupStatus.getBytesReceived()).thenReturn(7L); + when(rootGroupStatus.getFlowFilesReceived()).thenReturn(8); + } + + private void addGarbageCollectionToSystemDiagnostics(SystemDiagnostics systemDiagnostics) { + Map<String, GarbageCollection> garbageCollectionMap = new HashMap<>(); + + GarbageCollection garbageCollection1 = new GarbageCollection(); + garbageCollection1.setCollectionCount(1); + garbageCollection1.setCollectionTime(10); + garbageCollection1.setName("garbage 1"); + garbageCollectionMap.put(garbageCollection1.getName(), garbageCollection1); + + systemDiagnostics.setGarbageCollection(garbageCollectionMap); + } + + private void addContentRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) { + Map<String, StorageUsage> stringStorageUsageMap = new HashMap<>(); + + StorageUsage repoUsage1 = new StorageUsage(); + repoUsage1.setFreeSpace(30); + repoUsage1.setTotalSpace(100); + repoUsage1.setIdentifier("Content repo1"); + stringStorageUsageMap.put(repoUsage1.getIdentifier(), repoUsage1); + + systemDiagnostics.setContentRepositoryStorageUsage(stringStorageUsageMap); + } + + private void addFlowFileRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) { + StorageUsage repoUsage = new StorageUsage(); + repoUsage.setFreeSpace(30); + repoUsage.setTotalSpace(100); + repoUsage.setIdentifier("FlowFile repo"); + systemDiagnostics.setFlowFileRepositoryStorageUsage(repoUsage); + } + + private void addHeapSystemDiagnostics(SystemDiagnostics systemDiagnostics) { + systemDiagnostics.setMaxHeap(5); + systemDiagnostics.setTotalHeap(3); + systemDiagnostics.setUsedHeap(2); + systemDiagnostics.setMaxNonHeap(9); + systemDiagnostics.setTotalNonHeap(8); + systemDiagnostics.setUsedNonHeap(6); + } + + private void addProcessorInfoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) { + systemDiagnostics.setProcessorLoadAverage(80.9); + systemDiagnostics.setAvailableProcessors(5); + } + + private void mockProcessorEmptyValidation(String id, ProcessGroup processGroup) { + ProcessorNode processorNode = mock(ProcessorNode.class); + when(processGroup.getProcessor(id)).thenReturn(processorNode); + when(processorNode.getValidationErrors()).thenReturn(Collections.emptyList()); + } + + private void addControllerServiceHealth(ControllerServiceNode controllerServiceNode) { + when(controllerServiceNode.getName()).thenReturn("mockControllerService"); + when(controllerServiceNode.getIdentifier()).thenReturn("mockControllerService"); + when(controllerServiceNode.getState()).thenReturn(ControllerServiceState.ENABLED); + when(controllerServiceNode.getValidationErrors()).thenReturn(Collections.emptyList()); + } + + private void addReportingTaskNodeVariables(ReportingTaskNode reportingTaskNode) { + when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.emptyList()); + when(reportingTaskNode.getActiveThreadCount()).thenReturn(1); + when(reportingTaskNode.getScheduledState()).thenReturn(ScheduledState.RUNNING); + when(reportingTaskNode.getIdentifier()).thenReturn("ReportProvenance"); + when(reportingTaskNode.getName()).thenReturn("ReportProvenance"); + + } + + private void addRemoteProcessGroupStatus(RemoteProcessGroupStatus remoteProcessGroupStatus) { + remoteProcessGroupStatus.setName("rpg1"); + remoteProcessGroupStatus.setId("rpg1"); + remoteProcessGroupStatus.setTransmissionStatus(TransmissionStatus.Transmitting); + remoteProcessGroupStatus.setActiveRemotePortCount(1); + remoteProcessGroupStatus.setInactiveRemotePortCount(2); + + remoteProcessGroupStatus.setActiveThreadCount(3); + remoteProcessGroupStatus.setSentContentSize(4L); + remoteProcessGroupStatus.setSentCount(5); + } + + private void addBulletins(String message, String sourceId) { + Bulletin bulletin = mock(Bulletin.class); + when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L)); + when(bulletin.getMessage()).thenReturn(message); + + List<Bulletin> bulletinList = new ArrayList<>(); + bulletinList.add(bulletin); + + BulletinQueryAnswer bulletinQueryAnswer = new BulletinQueryAnswer(sourceId, bulletinList); + when(bulletinRepo.findBulletins(anyObject())).then(bulletinQueryAnswer); + } + + private void addValidationErrors(ConfiguredComponent connectable) { + ValidationResult validationResult = new ValidationResult.Builder() + .input("input") + .subject("subject") + .explanation("is not valid") + .build(); + + ValidationResult validationResult2 = new ValidationResult.Builder() + .input("input2") + .subject("subject2") + .explanation("is not valid too") + .build(); + + List<ValidationResult> validationResultList = new ArrayList<>(); + validationResultList.add(validationResult); + validationResultList.add(validationResult2); + when(connectable.getValidationErrors()).thenReturn(validationResultList); + } + + private class BulletinQueryAnswer implements Answer { + + final List<Bulletin> bulletinList; + String idToMatch = ""; + + private BulletinQueryAnswer(String idToMatch, List<Bulletin> bulletinList) { + this.idToMatch = idToMatch; + this.bulletinList = bulletinList; + } + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + BulletinQuery bulletinQuery = (BulletinQuery) invocationOnMock.getArguments()[0]; + if (idToMatch.equals(bulletinQuery.getSourceIdPattern().toString())) { + return bulletinList; + } + return Collections.emptyList(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml new file mode 100644 index 0000000..c8b497b --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>minifi-framework</artifactId> + <groupId>org.apache.nifi.minifi</groupId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>minifi-nar-utils</artifactId> + + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-properties</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java new file mode 100644 index 0000000..db0b35e --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionManager.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import org.apache.nifi.authentication.LoginIdentityProvider; + +import org.apache.nifi.authorization.AuthorityProvider; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.FlowFileSwapManager; +import org.apache.nifi.controller.status.history.ComponentStatusRepository; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.reporting.ReportingTask; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Scans through the classpath to load all FlowFileProcessors, FlowFileComparators, and ReportingTasks using the service provider API and running through all classloaders (root, NARs). + * + * @ThreadSafe - is immutable + */ +@SuppressWarnings("rawtypes") +public class ExtensionManager { + + private static final Logger logger = LoggerFactory.getLogger(ExtensionManager.class); + + // Maps a service definition (interface) to those classes that implement the interface + private static final Map<Class, Set<Class>> definitionMap = new HashMap<>(); + + private static final Map<String, ClassLoader> extensionClassloaderLookup = new HashMap<>(); + + static { + definitionMap.put(Processor.class, new HashSet<Class>()); + definitionMap.put(FlowFilePrioritizer.class, new HashSet<Class>()); + definitionMap.put(ReportingTask.class, new HashSet<Class>()); + definitionMap.put(ControllerService.class, new HashSet<Class>()); + definitionMap.put(AuthorityProvider.class, new HashSet<Class>()); + definitionMap.put(LoginIdentityProvider.class, new HashSet<Class>()); + definitionMap.put(ProvenanceEventRepository.class, new HashSet<Class>()); + definitionMap.put(ComponentStatusRepository.class, new HashSet<Class>()); + definitionMap.put(FlowFileRepository.class, new HashSet<Class>()); + definitionMap.put(FlowFileSwapManager.class, new HashSet<Class>()); + definitionMap.put(ContentRepository.class, new HashSet<Class>()); + } + + /** + * Loads all FlowFileProcessor, FlowFileComparator, ReportingTask class types that can be found on the bootstrap classloader and by creating classloaders for all NARs found within the classpath. + */ + public static void discoverExtensions() { + final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader(); + + // get the current context class loader + ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader(); + + // consider the system class loader + loadExtensions(systemClassLoader); + + // consider each nar class loader + for (final ClassLoader ncl : NarClassLoaders.getExtensionClassLoaders()) { + + // Must set the context class loader to the nar classloader itself + // so that static initialization techniques that depend on the context class loader will work properly + Thread.currentThread().setContextClassLoader(ncl); + loadExtensions(ncl); + } + + // restore the current context class loader if appropriate + if (currentContextClassLoader != null) { + Thread.currentThread().setContextClassLoader(currentContextClassLoader); + } + } + + /** + * Loads extensions from the specified class loader. + * + * @param classLoader from which to load extensions + */ + @SuppressWarnings("unchecked") + private static void loadExtensions(final ClassLoader classLoader) { + for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) { + final ServiceLoader<?> serviceLoader = ServiceLoader.load(entry.getKey(), classLoader); + + for (final Object o : serviceLoader) { + registerServiceClass(o.getClass(), extensionClassloaderLookup, classLoader, entry.getValue()); + } + } + } + + /** + * Registers extension for the specified type from the specified ClassLoader. + * + * @param type the extension type + * @param classloaderMap mapping of classname to classloader + * @param classLoader the classloader being mapped to + * @param classes to map to this classloader but which come from its ancestors + */ + private static void registerServiceClass(final Class<?> type, final Map<String, ClassLoader> classloaderMap, final ClassLoader classLoader, final Set<Class> classes) { + final String className = type.getName(); + final ClassLoader registeredClassLoader = classloaderMap.get(className); + + // see if this class is already registered (this should happen when the class is loaded by an ancestor of the specified classloader) + if (registeredClassLoader == null) { + classloaderMap.put(className, classLoader); + classes.add(type); + } else { + boolean loadedFromAncestor = false; + + // determine if this class was loaded from an ancestor + ClassLoader ancestorClassLoader = classLoader.getParent(); + while (ancestorClassLoader != null) { + if (ancestorClassLoader == registeredClassLoader) { + loadedFromAncestor = true; + break; + } + ancestorClassLoader = ancestorClassLoader.getParent(); + } + + // if this class was loaded from a non ancestor class loader, report potential unexpected behavior + if (!loadedFromAncestor) { + logger.warn("Attempt was made to load " + className + " from " + classLoader + + " but that class name is already loaded/registered from " + registeredClassLoader + + ". This may cause unpredictable behavior. Order of NARs is not guaranteed."); + } + } + } + + /** + * Determines the effective classloader for classes of the given type. If returns null it indicates the given type is not known or was not detected. + * + * @param classType to lookup the classloader of + * @return String of fully qualified class name; null if not a detected type + */ + public static ClassLoader getClassLoader(final String classType) { + return extensionClassloaderLookup.get(classType); + } + + public static Set<Class> getExtensions(final Class<?> definition) { + final Set<Class> extensions = definitionMap.get(definition); + return (extensions == null) ? Collections.<Class>emptySet() : extensions; + } + + public static void logClassLoaderMapping() { + final StringBuilder builder = new StringBuilder(); + + builder.append("Extension Type Mapping to Classloader:"); + for (final Map.Entry<Class, Set<Class>> entry : definitionMap.entrySet()) { + builder.append("\n\t=== ").append(entry.getKey().getSimpleName()).append(" type || Classloader ==="); + + for (final Class type : entry.getValue()) { + builder.append("\n\t").append(type.getName()).append(" || ").append(getClassLoader(type.getName())); + } + + builder.append("\n\t=== End ").append(entry.getKey().getSimpleName()).append(" types ==="); + } + + logger.info(builder.toString()); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java new file mode 100644 index 0000000..c478d97 --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/ExtensionMapping.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class ExtensionMapping { + + private final List<String> processorNames = new ArrayList<>(); + private final List<String> controllerServiceNames = new ArrayList<>(); + private final List<String> reportingTaskNames = new ArrayList<>(); + + void addProcessor(final String processorName) { + processorNames.add(processorName); + } + + void addAllProcessors(final Collection<String> processorNames) { + this.processorNames.addAll(processorNames); + } + + void addControllerService(final String controllerServiceName) { + controllerServiceNames.add(controllerServiceName); + } + + void addAllControllerServices(final Collection<String> controllerServiceNames) { + this.controllerServiceNames.addAll(controllerServiceNames); + } + + void addReportingTask(final String reportingTaskName) { + reportingTaskNames.add(reportingTaskName); + } + + void addAllReportingTasks(final Collection<String> reportingTaskNames) { + this.reportingTaskNames.addAll(reportingTaskNames); + } + + public List<String> getProcessorNames() { + return Collections.unmodifiableList(processorNames); + } + + public List<String> getControllerServiceNames() { + return Collections.unmodifiableList(controllerServiceNames); + } + + public List<String> getReportingTaskNames() { + return Collections.unmodifiableList(reportingTaskNames); + } + + public List<String> getAllExtensionNames() { + final List<String> extensionNames = new ArrayList<>(); + extensionNames.addAll(processorNames); + extensionNames.addAll(controllerServiceNames); + extensionNames.addAll(reportingTaskNames); + return extensionNames; + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java ---------------------------------------------------------------------- diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java new file mode 100644 index 0000000..f3be55b --- /dev/null +++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.nar; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; + +/** + * <p> + * A <tt>ClassLoader</tt> for loading NARs (NiFi archives). NARs are designed to + * allow isolating bundles of code (comprising one-or-more NiFi + * <tt>FlowFileProcessor</tt>s, <tt>FlowFileComparator</tt>s and their + * dependencies) from other such bundles; this allows for dependencies and + * processors that require conflicting, incompatible versions of the same + * dependency to run in a single instance of NiFi.</p> + * + * <p> + * <tt>NarClassLoader</tt> follows the delegation model described in + * {@link ClassLoader#findClass(String) ClassLoader.findClass(...)}; + * classes are first loaded from the parent <tt>ClassLoader</tt>, and only if + * they cannot be found there does the <tt>NarClassLoader</tt> provide a + * definition. Specifically, this means that resources are loaded from NiFi's + * <tt>conf</tt> + * and <tt>lib</tt> directories first, and if they cannot be found there, are + * loaded from the NAR.</p> + * + * <p> + * The packaging of a NAR is such that it is a ZIP file with the following + * directory structure: + * + * <pre> + * +META-INF/ + * +-- bundled-dependencies/ + * +-- <JAR files> + * +-- MANIFEST.MF + * </pre> + * </p> + * + * <p> + * The MANIFEST.MF file contains the same information as a typical JAR file but + * also includes two additional NiFi properties: {@code Nar-Id} and + * {@code Nar-Dependency-Id}. + * </p> + * + * <p> + * The {@code Nar-Id} provides a unique identifier for this NAR. + * </p> + * + * <p> + * The {@code Nar-Dependency-Id} is optional. If provided, it indicates that + * this NAR should inherit all of the dependencies of the NAR with the provided + * ID. Often times, the NAR that is depended upon is referred to as the Parent. + * This is because its ClassLoader will be the parent ClassLoader of the + * dependent NAR. + * </p> + * + * <p> + * If a NAR is built using NiFi's Maven NAR Plugin, the {@code Nar-Id} property + * will be set to the artifactId of the NAR. The {@code Nar-Dependency-Id} will + * be set to the artifactId of the NAR that is depended upon. For example, if + * NAR A is defined as such: + * + * <pre> + * ... + * <artifactId>nar-a</artifactId> + * <packaging>nar</packaging> + * ... + * <dependencies> + * <dependency> + * <groupId>group</groupId> + * <artifactId>nar-z</artifactId> + * <b><type>nar</type></b> + * </dependency> + * </dependencies> + * </pre> + * </p> + * + * + * <p> + * Then the MANIFEST.MF file that is created for NAR A will have the following + * properties set: + * <ul> + * <li>{@code Nar-Id: nar-a}</li> + * <li>{@code Nar-Dependency-Id: nar-z}</li> + * </ul> + * </p> + * + * <p> + * Note, above, that the {@code type} of the dependency is set to {@code nar}. + * </p> + * + * <p> + * If the NAR has more than one dependency of {@code type} {@code nar}, then the + * Maven NAR plugin will fail to build the NAR. + * </p> + */ +public class NarClassLoader extends URLClassLoader { + + private static final Logger LOGGER = LoggerFactory.getLogger(NarClassLoader.class); + + private static final FileFilter JAR_FILTER = new FileFilter() { + @Override + public boolean accept(File pathname) { + final String nameToTest = pathname.getName().toLowerCase(); + return nameToTest.endsWith(".jar") && pathname.isFile(); + } + }; + + /** + * The NAR for which this <tt>ClassLoader</tt> is responsible. + */ + private final File narWorkingDirectory; + + /** + * Construct a nar class loader. + * + * @param narWorkingDirectory directory to explode nar contents to + * @throws IllegalArgumentException if the NAR is missing the Java Services + * API file for <tt>FlowFileProcessor</tt> implementations. + * @throws ClassNotFoundException if any of the <tt>FlowFileProcessor</tt> + * implementations defined by the Java Services API cannot be loaded. + * @throws IOException if an error occurs while loading the NAR. + */ + public NarClassLoader(final File narWorkingDirectory) throws ClassNotFoundException, IOException { + super(new URL[0]); + this.narWorkingDirectory = narWorkingDirectory; + + // process the classpath + updateClasspath(narWorkingDirectory); + } + + /** + * Construct a nar class loader with the specific parent. + * + * @param narWorkingDirectory directory to explode nar contents to + * @param parentClassLoader parent class loader of this nar + * @throws IllegalArgumentException if the NAR is missing the Java Services + * API file for <tt>FlowFileProcessor</tt> implementations. + * @throws ClassNotFoundException if any of the <tt>FlowFileProcessor</tt> + * implementations defined by the Java Services API cannot be loaded. + * @throws IOException if an error occurs while loading the NAR. + */ + public NarClassLoader(final File narWorkingDirectory, final ClassLoader parentClassLoader) throws ClassNotFoundException, IOException { + super(new URL[0], parentClassLoader); + this.narWorkingDirectory = narWorkingDirectory; + + // process the classpath + updateClasspath(narWorkingDirectory); + } + + public File getWorkingDirectory() { + return narWorkingDirectory; + } + + /** + * Adds URLs for the resources unpacked from this NAR: + * <ul><li>the root: for classes, <tt>META-INF</tt>, etc.</li> + * <li><tt>META-INF/dependencies</tt>: for config files, <tt>.so</tt>s, + * etc.</li> + * <li><tt>META-INF/dependencies/*.jar</tt>: for dependent + * libraries</li></ul> + * + * @param root the root directory of the unpacked NAR. + * @throws IOException if the URL list could not be updated. + */ + private void updateClasspath(File root) throws IOException { + addURL(root.toURI().toURL()); // for compiled classes, META-INF/, etc. + + File dependencies = new File(root, "META-INF/bundled-dependencies"); + if (!dependencies.isDirectory()) { + LOGGER.warn(narWorkingDirectory + " does not contain META-INF/bundled-dependencies!"); + } + addURL(dependencies.toURI().toURL()); + if (dependencies.isDirectory()) { + for (File libJar : dependencies.listFiles(JAR_FILTER)) { + addURL(libJar.toURI().toURL()); + } + } + } + + @Override + protected String findLibrary(final String libname) { + File dependencies = new File(narWorkingDirectory, "META-INF/bundled-dependencies"); + if (!dependencies.isDirectory()) { + LOGGER.warn(narWorkingDirectory + " does not contain META-INF/bundled-dependencies!"); + } + + final File nativeDir = new File(dependencies, "native"); + final File libsoFile = new File(nativeDir, "lib" + libname + ".so"); + final File dllFile = new File(nativeDir, libname + ".dll"); + final File soFile = new File(nativeDir, libname + ".so"); + if (libsoFile.exists()) { + return libsoFile.getAbsolutePath(); + } else if (dllFile.exists()) { + return dllFile.getAbsolutePath(); + } else if (soFile.exists()) { + return soFile.getAbsolutePath(); + } + + // not found in the nar. try system native dir + return null; + } + + @Override + public String toString() { + return NarClassLoader.class.getName() + "[" + narWorkingDirectory.getPath() + "]"; + } +}
