http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index f770f5e..557bd62 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -16,39 +16,34 @@ */ package org.apache.nifi.controller; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; - import org.apache.commons.io.IOUtils; import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer; import org.apache.nifi.authorization.AccessPolicy; import org.apache.nifi.authorization.Group; import org.apache.nifi.authorization.MockPolicyBasedAuthorizer; import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.User; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.exception.ProcessorInstantiationException; import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.serialization.FlowSynchronizer; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.controller.service.mock.DummyProcessor; +import org.apache.nifi.controller.service.mock.DummyReportingTask; +import org.apache.nifi.controller.service.mock.ServiceA; +import org.apache.nifi.controller.service.mock.ServiceB; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.logging.LogRepository; +import org.apache.nifi.logging.LogRepositoryFactory; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; @@ -56,21 +51,53 @@ import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.web.api.dto.BundleDTO; +import org.apache.nifi.web.api.dto.ControllerServiceDTO; +import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.PositionDTO; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.dto.ProcessorDTO; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestFlowController { private FlowController controller; private AbstractPolicyBasedAuthorizer authorizer; - private StandardFlowSynchronizer standardFlowSynchronizer; private FlowFileEventRepository flowFileEventRepo; private AuditService auditService; private StringEncryptor encryptor; private NiFiProperties nifiProperties; + private Bundle systemBundle; private BulletinRepository bulletinRepo; private VariableRegistry variableRegistry; @@ -87,6 +114,10 @@ public class TestFlowController { nifiProperties = NiFiProperties.createBasicNiFiProperties(null, otherProps); encryptor = StringEncryptor.createEncryptor(nifiProperties); + // use the system bundle + systemBundle = ExtensionManager.createSystemBundle(nifiProperties); + ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + User user1 = new User.Builder().identifier("user-id-1").identity("user-1").build(); User user2 = new User.Builder().identifier("user-id-2").identity("user-2").build(); @@ -128,8 +159,6 @@ public class TestFlowController { bulletinRepo = Mockito.mock(BulletinRepository.class); controller = FlowController.createStandaloneInstance(flowFileEventRepo, nifiProperties, authorizer, auditService, encryptor, bulletinRepo, variableRegistry); - - standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); } @After @@ -139,6 +168,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWithReportingTaskAndProcessorReferencingControllerService() throws IOException { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); @@ -200,6 +231,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWithProcessorReferencingControllerService() throws IOException { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); @@ -237,6 +270,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWhenAuthorizationsAreEqual() { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); @@ -249,6 +284,8 @@ public class TestFlowController { @Test(expected = UninheritableFlowException.class) public void testSynchronizeFlowWhenAuthorizationsAreDifferent() { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + // create a mock proposed data flow with different auth fingerprint as the current authorizer final String authFingerprint = "<authorizations></authorizations>"; final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); @@ -260,6 +297,8 @@ public class TestFlowController { @Test(expected = UninheritableFlowException.class) public void testSynchronizeFlowWhenProposedAuthorizationsAreNull() { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); when(proposedDataFlow.getAuthorizerFingerprint()).thenReturn(null); @@ -268,6 +307,8 @@ public class TestFlowController { @Test public void testSynchronizeFlowWhenCurrentAuthorizationsAreEmptyAndProposedAreNot() { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + // create a mock proposed data flow with the same auth fingerprint as the current authorizer final String authFingerprint = authorizer.getFingerprint(); final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); @@ -283,8 +324,115 @@ public class TestFlowController { } @Test + public void testSynchronizeFlowWhenProposedMissingComponentsAreDifferent() { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + + final Set<String> missingComponents = new HashSet<>(); + missingComponents.add("1"); + missingComponents.add("2"); + + final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); + when(proposedDataFlow.getMissingComponents()).thenReturn(missingComponents); + + try { + controller.synchronize(standardFlowSynchronizer, proposedDataFlow); + Assert.fail("Should have thrown exception"); + } catch (UninheritableFlowException e) { + assertTrue(e.getMessage().contains("Proposed flow has missing components that are not considered missing in the current flow (1,2)")); + } + } + + @Test + public void testSynchronizeFlowWhenExistingMissingComponentsAreDifferent() throws IOException { + final StringEncryptor stringEncryptor = StringEncryptor.createEncryptor(nifiProperties); + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(stringEncryptor, nifiProperties); + + final ProcessorNode mockProcessorNode = mock(ProcessorNode.class); + when(mockProcessorNode.getIdentifier()).thenReturn("1"); + when(mockProcessorNode.isExtensionMissing()).thenReturn(true); + + final ControllerServiceNode mockControllerServiceNode = mock(ControllerServiceNode.class); + when(mockControllerServiceNode.getIdentifier()).thenReturn("2"); + when(mockControllerServiceNode.isExtensionMissing()).thenReturn(true); + + final ReportingTaskNode mockReportingTaskNode = mock(ReportingTaskNode.class); + when(mockReportingTaskNode.getIdentifier()).thenReturn("3"); + when(mockReportingTaskNode.isExtensionMissing()).thenReturn(true); + + final ProcessGroup mockRootGroup = mock(ProcessGroup.class); + when(mockRootGroup.findAllProcessors()).thenReturn(Collections.singletonList(mockProcessorNode)); + + final SnippetManager mockSnippetManager = mock(SnippetManager.class); + when(mockSnippetManager.export()).thenReturn(new byte[0]); + + final FlowController mockFlowController = mock(FlowController.class); + when(mockFlowController.getRootGroup()).thenReturn(mockRootGroup); + when(mockFlowController.getAllControllerServices()).thenReturn(new HashSet<>(Arrays.asList(mockControllerServiceNode))); + when(mockFlowController.getAllReportingTasks()).thenReturn(new HashSet<>(Arrays.asList(mockReportingTaskNode))); + when(mockFlowController.getAuthorizer()).thenReturn(authorizer); + when(mockFlowController.getSnippetManager()).thenReturn(mockSnippetManager); + + final DataFlow proposedDataFlow = Mockito.mock(DataFlow.class); + when(proposedDataFlow.getMissingComponents()).thenReturn(new HashSet<>()); + + try { + standardFlowSynchronizer.sync(mockFlowController, proposedDataFlow, stringEncryptor); + Assert.fail("Should have thrown exception"); + } catch (UninheritableFlowException e) { + assertTrue(e.getMessage(), e.getMessage().contains("Current flow has missing components that are not considered missing in the proposed flow (1,2,3)")); + } + } + + @Test + public void testSynchronizeFlowWhenBundlesAreSame() throws IOException { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + + final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df"); + logRepository.removeAllObservers(); + + syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer); + syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer); + } + + @Test + public void testSynchronizeFlowWhenBundlesAreDifferent() throws IOException { + final FlowSynchronizer standardFlowSynchronizer = new StandardFlowSynchronizer(StringEncryptor.createEncryptor(nifiProperties), nifiProperties); + + final LogRepository logRepository = LogRepositoryFactory.getRepository("d89ada5d-35fb-44ff-83f1-4cc00b48b2df"); + logRepository.removeAllObservers(); + + // first sync should work because we are syncing to an empty flow controller + syncFlow("src/test/resources/nifi/fingerprint/flow4.xml", standardFlowSynchronizer); + + // second sync should fail because the bundle of the processor is different + try { + syncFlow("src/test/resources/nifi/fingerprint/flow4-with-different-bundle.xml", standardFlowSynchronizer); + Assert.fail("Should have thrown UninheritableFlowException"); + } catch (UninheritableFlowException e) { + //e.printStackTrace(); + } + } + + private void syncFlow(String flowXmlFile, FlowSynchronizer standardFlowSynchronizer) throws IOException { + String flowString = null; + try (final InputStream in = new FileInputStream(flowXmlFile)) { + flowString = IOUtils.toString(in, StandardCharsets.UTF_8); + } + assertNotNull(flowString); + + final DataFlow proposedDataFlow1 = Mockito.mock(DataFlow.class); + when(proposedDataFlow1.getFlow()).thenReturn(flowString.getBytes(StandardCharsets.UTF_8)); + + final String authFingerprint = authorizer.getFingerprint(); + when(proposedDataFlow1.getAuthorizerFingerprint()).thenReturn(authFingerprint.getBytes(StandardCharsets.UTF_8)); + + controller.synchronize(standardFlowSynchronizer, proposedDataFlow1); + } + + @Test public void testCreateMissingProcessor() throws ProcessorInstantiationException { - final ProcessorNode procNode = controller.createProcessor("org.apache.nifi.NonExistingProcessor", "1234-Processor"); + final ProcessorNode procNode = controller.createProcessor("org.apache.nifi.NonExistingProcessor", "1234-Processor", + systemBundle.getBundleDetails().getCoordinate()); assertNotNull(procNode); assertEquals("org.apache.nifi.NonExistingProcessor", procNode.getCanonicalClassName()); assertEquals("(Missing) NonExistingProcessor", procNode.getComponentType()); @@ -301,7 +449,8 @@ public class TestFlowController { @Test public void testCreateMissingReportingTask() throws ReportingTaskInstantiationException { - final ReportingTaskNode taskNode = controller.createReportingTask("org.apache.nifi.NonExistingReportingTask", "1234-Reporting-Task", true); + final ReportingTaskNode taskNode = controller.createReportingTask("org.apache.nifi.NonExistingReportingTask", "1234-Reporting-Task", + systemBundle.getBundleDetails().getCoordinate(), true); assertNotNull(taskNode); assertEquals("org.apache.nifi.NonExistingReportingTask", taskNode.getCanonicalClassName()); assertEquals("(Missing) NonExistingReportingTask", taskNode.getComponentType()); @@ -315,7 +464,8 @@ public class TestFlowController { @Test public void testCreateMissingControllerService() throws ProcessorInstantiationException { - final ControllerServiceNode serviceNode = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "1234-Controller-Service", false); + final ControllerServiceNode serviceNode = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "1234-Controller-Service", + systemBundle.getBundleDetails().getCoordinate(), false); assertNotNull(serviceNode); assertEquals("org.apache.nifi.NonExistingControllerService", serviceNode.getCanonicalClassName()); assertEquals("(Missing) NonExistingControllerService", serviceNode.getComponentType()); @@ -333,8 +483,9 @@ public class TestFlowController { } @Test - public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException { - ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor"); + public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException,ClassNotFoundException,InstantiationException,IllegalAccessException { + ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor", + systemBundle.getBundleDetails().getCoordinate()); assertEquals(5,p_scheduled.getMaxConcurrentTasks()); assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy()); assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod()); @@ -345,14 +496,15 @@ public class TestFlowController { @Test public void testReportingTaskDefaultScheduleAnnotation() throws ReportingTaskInstantiationException { - ReportingTaskNode p_scheduled = controller.createReportingTask(DummyScheduledReportingTask.class.getName()); + ReportingTaskNode p_scheduled = controller.createReportingTask(DummyScheduledReportingTask.class.getName(), systemBundle.getBundleDetails().getCoordinate()); assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy()); assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod()); } @Test - public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException { - ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor"); + public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException,ClassNotFoundException { + + ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor", systemBundle.getBundleDetails().getCoordinate()); assertEquals("5 sec",p_settings.getYieldPeriod()); assertEquals("1 min",p_settings.getPenalizationPeriod()); assertEquals(LogLevel.DEBUG,p_settings.getBulletinLevel()); @@ -365,7 +517,8 @@ public class TestFlowController { public void testDeleteProcessGroup() { ProcessGroup pg = controller.createProcessGroup("my-process-group"); pg.setName("my-process-group"); - ControllerServiceNode cs = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "my-controller-service", false); + ControllerServiceNode cs = controller.createControllerService("org.apache.nifi.NonExistingControllerService", "my-controller-service", + systemBundle.getBundleDetails().getCoordinate(), false); pg.addControllerService(cs); controller.getRootGroup().addProcessGroup(pg); controller.getRootGroup().removeProcessGroup(pg); @@ -373,4 +526,281 @@ public class TestFlowController { assertTrue(pg.getControllerServices(true).isEmpty()); } + @Test + public void testChangeProcessorType() throws ProcessorInstantiationException { + final String id = "1234-ScheduledProcessor" + System.currentTimeMillis(); + final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate(); + final ProcessorNode processorNode = controller.createProcessor(DummyScheduledProcessor.class.getName(), id, coordinate); + final String originalName = processorNode.getName(); + + assertEquals(id, processorNode.getIdentifier()); + assertEquals(id, processorNode.getComponent().getIdentifier()); + assertEquals(coordinate.getCoordinate(), processorNode.getBundleCoordinate().getCoordinate()); + assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getCanonicalClassName()); + assertEquals(DummyScheduledProcessor.class.getSimpleName(), processorNode.getComponentType()); + assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getComponent().getClass().getCanonicalName()); + + assertEquals(5, processorNode.getMaxConcurrentTasks()); + assertEquals(SchedulingStrategy.CRON_DRIVEN, processorNode.getSchedulingStrategy()); + assertEquals("0 0 0 1/1 * ?",processorNode.getSchedulingPeriod()); + assertEquals("1 sec", processorNode.getYieldPeriod()); + assertEquals("30 sec", processorNode.getPenalizationPeriod()); + assertEquals(LogLevel.WARN, processorNode.getBulletinLevel()); + + // now change the type of the processor from DummyScheduledProcessor to DummySettingsProcessor + controller.changeProcessorType(processorNode, DummySettingsProcessor.class.getName(), coordinate); + + // ids and coordinate should stay the same + assertEquals(id, processorNode.getIdentifier()); + assertEquals(id, processorNode.getComponent().getIdentifier()); + assertEquals(coordinate.getCoordinate(), processorNode.getBundleCoordinate().getCoordinate()); + + // in this test we happened to change between two processors that have different canonical class names + // but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so + // for the sake of this test it is ok that the canonical class name hasn't changed + assertEquals(originalName, processorNode.getName()); + assertEquals(DummyScheduledProcessor.class.getCanonicalName(), processorNode.getCanonicalClassName()); + assertEquals(DummyScheduledProcessor.class.getSimpleName(), processorNode.getComponentType()); + assertEquals(DummySettingsProcessor.class.getCanonicalName(), processorNode.getComponent().getClass().getCanonicalName()); + + // all these settings should have stayed the same + assertEquals(5, processorNode.getMaxConcurrentTasks()); + assertEquals(SchedulingStrategy.CRON_DRIVEN, processorNode.getSchedulingStrategy()); + assertEquals("0 0 0 1/1 * ?", processorNode.getSchedulingPeriod()); + assertEquals("1 sec", processorNode.getYieldPeriod()); + assertEquals("30 sec", processorNode.getPenalizationPeriod()); + assertEquals(LogLevel.WARN, processorNode.getBulletinLevel()); + } + + @Test + public void testChangeControllerServiceType() { + final String id = "ServiceA" + System.currentTimeMillis(); + final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate(); + final ControllerServiceNode controllerServiceNode = controller.createControllerService(ServiceA.class.getName(), id, coordinate, true); + final String originalName = controllerServiceNode.getName(); + + assertEquals(id, controllerServiceNode.getIdentifier()); + assertEquals(id, controllerServiceNode.getComponent().getIdentifier()); + assertEquals(coordinate.getCoordinate(), controllerServiceNode.getBundleCoordinate().getCoordinate()); + assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getCanonicalClassName()); + assertEquals(ServiceA.class.getSimpleName(), controllerServiceNode.getComponentType()); + assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getComponent().getClass().getCanonicalName()); + + controller.changeControllerServiceType(controllerServiceNode, ServiceB.class.getName(), coordinate); + + // ids and coordinate should stay the same + assertEquals(id, controllerServiceNode.getIdentifier()); + assertEquals(id, controllerServiceNode.getComponent().getIdentifier()); + assertEquals(coordinate.getCoordinate(), controllerServiceNode.getBundleCoordinate().getCoordinate()); + + // in this test we happened to change between two services that have different canonical class names + // but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so + // for the sake of this test it is ok that the canonical class name hasn't changed + assertEquals(originalName, controllerServiceNode.getName()); + assertEquals(ServiceA.class.getCanonicalName(), controllerServiceNode.getCanonicalClassName()); + assertEquals(ServiceA.class.getSimpleName(), controllerServiceNode.getComponentType()); + assertEquals(ServiceB.class.getCanonicalName(), controllerServiceNode.getComponent().getClass().getCanonicalName()); + } + + @Test + public void testChangeReportingTaskType() throws ReportingTaskInstantiationException { + final String id = "ReportingTask" + System.currentTimeMillis(); + final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate(); + final ReportingTaskNode node = controller.createReportingTask(DummyReportingTask.class.getName(), id, coordinate, true); + final String originalName = node.getName(); + + assertEquals(id, node.getIdentifier()); + assertEquals(id, node.getComponent().getIdentifier()); + assertEquals(coordinate.getCoordinate(), node.getBundleCoordinate().getCoordinate()); + assertEquals(DummyReportingTask.class.getCanonicalName(), node.getCanonicalClassName()); + assertEquals(DummyReportingTask.class.getSimpleName(), node.getComponentType()); + assertEquals(DummyReportingTask.class.getCanonicalName(), node.getComponent().getClass().getCanonicalName()); + + controller.changeReportingTaskType(node, DummyScheduledReportingTask.class.getName(), coordinate); + + // ids and coordinate should stay the same + assertEquals(id, node.getIdentifier()); + assertEquals(id, node.getComponent().getIdentifier()); + assertEquals(coordinate.getCoordinate(), node.getBundleCoordinate().getCoordinate()); + + // in this test we happened to change between two services that have different canonical class names + // but in the running application the DAO layer would call verifyCanUpdateBundle and would prevent this so + // for the sake of this test it is ok that the canonical class name hasn't changed + assertEquals(originalName, node.getName()); + assertEquals(DummyReportingTask.class.getCanonicalName(), node.getCanonicalClassName()); + assertEquals(DummyReportingTask.class.getSimpleName(), node.getComponentType()); + assertEquals(DummyScheduledReportingTask.class.getCanonicalName(), node.getComponent().getClass().getCanonicalName()); + + } + + @Test(expected = IllegalArgumentException.class) + public void testInstantiateSnippetWhenProcessorMissingBundle() throws Exception { + final String id = UUID.randomUUID().toString(); + final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate(); + final ProcessorNode processorNode = controller.createProcessor(DummyProcessor.class.getName(), id, coordinate); + + // create a processor dto + final ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId(UUID.randomUUID().toString()); // use a different id here + processorDTO.setPosition(new PositionDTO(new Double(0), new Double(0))); + processorDTO.setStyle(processorNode.getStyle()); + processorDTO.setParentGroupId("1234"); + processorDTO.setInputRequirement(processorNode.getInputRequirement().name()); + processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class)); + processorDTO.setRestricted(processorNode.isRestricted()); + processorDTO.setExtensionMissing(processorNode.isExtensionMissing()); + + processorDTO.setType(processorNode.getCanonicalClassName()); + processorDTO.setBundle(null); // missing bundle + processorDTO.setName(processorNode.getName()); + processorDTO.setState(processorNode.getScheduledState().toString()); + + processorDTO.setRelationships(new ArrayList<>()); + + processorDTO.setDescription("description"); + processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially()); + processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported()); + processorDTO.setSupportsBatching(processorNode.isHighThroughputSupported()); + + ProcessorConfigDTO configDTO = new ProcessorConfigDTO(); + configDTO.setSchedulingPeriod(processorNode.getSchedulingPeriod()); + configDTO.setPenaltyDuration(processorNode.getPenalizationPeriod()); + configDTO.setYieldDuration(processorNode.getYieldPeriod()); + configDTO.setRunDurationMillis(processorNode.getRunDuration(TimeUnit.MILLISECONDS)); + configDTO.setConcurrentlySchedulableTaskCount(processorNode.getMaxConcurrentTasks()); + configDTO.setLossTolerant(processorNode.isLossTolerant()); + configDTO.setComments(processorNode.getComments()); + configDTO.setBulletinLevel(processorNode.getBulletinLevel().name()); + configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name()); + configDTO.setExecutionNode(processorNode.getExecutionNode().name()); + configDTO.setAnnotationData(processorNode.getAnnotationData()); + + processorDTO.setConfig(configDTO); + + // create the snippet with the processor + final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO(); + flowSnippetDTO.setProcessors(Collections.singleton(processorDTO)); + + // instantiate the snippet + assertEquals(0, controller.getRootGroup().getProcessors().size()); + controller.instantiateSnippet(controller.getRootGroup(), flowSnippetDTO); + } + + @Test + public void testInstantiateSnippetWithProcessor() throws ProcessorInstantiationException { + final String id = UUID.randomUUID().toString(); + final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate(); + final ProcessorNode processorNode = controller.createProcessor(DummyProcessor.class.getName(), id, coordinate); + + // create a processor dto + final ProcessorDTO processorDTO = new ProcessorDTO(); + processorDTO.setId(UUID.randomUUID().toString()); // use a different id here + processorDTO.setPosition(new PositionDTO(new Double(0), new Double(0))); + processorDTO.setStyle(processorNode.getStyle()); + processorDTO.setParentGroupId("1234"); + processorDTO.setInputRequirement(processorNode.getInputRequirement().name()); + processorDTO.setPersistsState(processorNode.getProcessor().getClass().isAnnotationPresent(Stateful.class)); + processorDTO.setRestricted(processorNode.isRestricted()); + processorDTO.setExtensionMissing(processorNode.isExtensionMissing()); + + processorDTO.setType(processorNode.getCanonicalClassName()); + processorDTO.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion())); + processorDTO.setName(processorNode.getName()); + processorDTO.setState(processorNode.getScheduledState().toString()); + + processorDTO.setRelationships(new ArrayList<>()); + + processorDTO.setDescription("description"); + processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially()); + processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported()); + processorDTO.setSupportsBatching(processorNode.isHighThroughputSupported()); + + ProcessorConfigDTO configDTO = new ProcessorConfigDTO(); + configDTO.setSchedulingPeriod(processorNode.getSchedulingPeriod()); + configDTO.setPenaltyDuration(processorNode.getPenalizationPeriod()); + configDTO.setYieldDuration(processorNode.getYieldPeriod()); + configDTO.setRunDurationMillis(processorNode.getRunDuration(TimeUnit.MILLISECONDS)); + configDTO.setConcurrentlySchedulableTaskCount(processorNode.getMaxConcurrentTasks()); + configDTO.setLossTolerant(processorNode.isLossTolerant()); + configDTO.setComments(processorNode.getComments()); + configDTO.setBulletinLevel(processorNode.getBulletinLevel().name()); + configDTO.setSchedulingStrategy(processorNode.getSchedulingStrategy().name()); + configDTO.setExecutionNode(processorNode.getExecutionNode().name()); + configDTO.setAnnotationData(processorNode.getAnnotationData()); + + processorDTO.setConfig(configDTO); + + // create the snippet with the processor + final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO(); + flowSnippetDTO.setProcessors(Collections.singleton(processorDTO)); + + // instantiate the snippet + assertEquals(0, controller.getRootGroup().getProcessors().size()); + controller.instantiateSnippet(controller.getRootGroup(), flowSnippetDTO); + assertEquals(1, controller.getRootGroup().getProcessors().size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInstantiateSnippetWhenControllerServiceMissingBundle() throws ProcessorInstantiationException { + final String id = UUID.randomUUID().toString(); + final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate(); + final ControllerServiceNode controllerServiceNode = controller.createControllerService(ServiceA.class.getName(), id, coordinate, true); + + // create the controller service dto + final ControllerServiceDTO csDto = new ControllerServiceDTO(); + csDto.setId(UUID.randomUUID().toString()); // use a different id + csDto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier()); + csDto.setName(controllerServiceNode.getName()); + csDto.setType(controllerServiceNode.getCanonicalClassName()); + csDto.setBundle(null); // missing bundle + csDto.setState(controllerServiceNode.getState().name()); + csDto.setAnnotationData(controllerServiceNode.getAnnotationData()); + csDto.setComments(controllerServiceNode.getComments()); + csDto.setPersistsState(controllerServiceNode.getControllerServiceImplementation().getClass().isAnnotationPresent(Stateful.class)); + csDto.setRestricted(controllerServiceNode.isRestricted()); + csDto.setExtensionMissing(controllerServiceNode.isExtensionMissing()); + csDto.setDescriptors(new LinkedHashMap<>()); + csDto.setProperties(new LinkedHashMap<>()); + + // create the snippet with the controller service + final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO(); + flowSnippetDTO.setControllerServices(Collections.singleton(csDto)); + + // instantiate the snippet + assertEquals(0, controller.getRootGroup().getControllerServices(false).size()); + controller.instantiateSnippet(controller.getRootGroup(), flowSnippetDTO); + } + + @Test + public void testInstantiateSnippetWithControllerService() throws ProcessorInstantiationException { + final String id = UUID.randomUUID().toString(); + final BundleCoordinate coordinate = systemBundle.getBundleDetails().getCoordinate(); + final ControllerServiceNode controllerServiceNode = controller.createControllerService(ServiceA.class.getName(), id, coordinate, true); + + // create the controller service dto + final ControllerServiceDTO csDto = new ControllerServiceDTO(); + csDto.setId(UUID.randomUUID().toString()); // use a different id + csDto.setParentGroupId(controllerServiceNode.getProcessGroup() == null ? null : controllerServiceNode.getProcessGroup().getIdentifier()); + csDto.setName(controllerServiceNode.getName()); + csDto.setType(controllerServiceNode.getCanonicalClassName()); + csDto.setBundle(new BundleDTO(coordinate.getGroup(), coordinate.getId(), coordinate.getVersion())); + csDto.setState(controllerServiceNode.getState().name()); + csDto.setAnnotationData(controllerServiceNode.getAnnotationData()); + csDto.setComments(controllerServiceNode.getComments()); + csDto.setPersistsState(controllerServiceNode.getControllerServiceImplementation().getClass().isAnnotationPresent(Stateful.class)); + csDto.setRestricted(controllerServiceNode.isRestricted()); + csDto.setExtensionMissing(controllerServiceNode.isExtensionMissing()); + csDto.setDescriptors(new LinkedHashMap<>()); + csDto.setProperties(new LinkedHashMap<>()); + + // create the snippet with the controller service + final FlowSnippetDTO flowSnippetDTO = new FlowSnippetDTO(); + flowSnippetDTO.setControllerServices(Collections.singleton(csDto)); + + // instantiate the snippet + assertEquals(0, controller.getRootGroup().getControllerServices(false).size()); + controller.instantiateSnippet(controller.getRootGroup(), flowSnippetDTO); + assertEquals(1, controller.getRootGroup().getControllerServices(false).size()); + } + }
http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java index e9623e3..a86b7b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java @@ -20,6 +20,8 @@ package org.apache.nifi.controller; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -57,7 +59,6 @@ import java.net.URLClassLoader; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -81,15 +82,17 @@ public class TestStandardProcessorNode { @Test(timeout = 10000) public void testStart() throws InterruptedException { - System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessorNode.class.getResource("/conf/nifi.properties").getFile()); final ProcessorThatThrowsExceptionOnScheduled processor = new ProcessorThatThrowsExceptionOnScheduled(); final String uuid = UUID.randomUUID().toString(); ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, null, null, null, null); processor.initialize(initContext); - final StandardProcessorNode procNode = new StandardProcessorNode(processor, uuid, createValidationContextFactory(), null, null, - NiFiProperties.createBasicNiFiProperties(null, null), VariableRegistry.EMPTY_REGISTRY, Mockito.mock(ComponentLog.class)); + final BundleCoordinate coordinate = Mockito.mock(BundleCoordinate.class); + + final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, coordinate, null); + final StandardProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, createValidationContextFactory(), null, null, + NiFiProperties.createBasicNiFiProperties(null, null), VariableRegistry.EMPTY_REGISTRY); final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestClasspathResources", true); final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, null); @@ -140,12 +143,6 @@ public class TestStandardProcessorNode { final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp)); final StandardProcessorNode procNode = createProcessorNode(processor); - final Set<ClassLoader> classLoaders = new HashSet<>(); - classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); - - // Load all of the extensions in src/test/java of this project - ExtensionManager.discoverExtensions(classLoaders); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Should have an InstanceClassLoader here final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -189,12 +186,6 @@ public class TestStandardProcessorNode { final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp, otherProp)); final StandardProcessorNode procNode = createProcessorNode(processor); - final Set<ClassLoader> classLoaders = new HashSet<>(); - classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); - - // Load all of the extensions in src/test/java of this project - ExtensionManager.discoverExtensions(classLoaders); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Should have an InstanceClassLoader here final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -263,12 +254,6 @@ public class TestStandardProcessorNode { final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2)); final StandardProcessorNode procNode = createProcessorNode(processor); - final Set<ClassLoader> classLoaders = new HashSet<>(); - classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); - - // Load all of the extensions in src/test/java of this project - ExtensionManager.discoverExtensions(classLoaders); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Should have an InstanceClassLoader here final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -315,12 +300,6 @@ public class TestStandardProcessorNode { final ModifiesClasspathProcessor processor = new ModifiesClasspathProcessor(Arrays.asList(classpathProp1, classpathProp2)); final StandardProcessorNode procNode = createProcessorNode(processor); - final Set<ClassLoader> classLoaders = new HashSet<>(); - classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); - - // Load all of the extensions in src/test/java of this project - ExtensionManager.discoverExtensions(classLoaders); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Should have an InstanceClassLoader here final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); @@ -359,12 +338,6 @@ public class TestStandardProcessorNode { final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor(); final StandardProcessorNode procNode = createProcessorNode(processor); - final Set<ClassLoader> classLoaders = new HashSet<>(); - classLoaders.add(procNode.getProcessor().getClass().getClassLoader()); - - // Load all of the extensions in src/test/java of this project - ExtensionManager.discoverExtensions(classLoaders); - try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(procNode.getProcessor().getClass(), procNode.getIdentifier())){ // Can't validate the ClassLoader here b/c the class is missing the annotation @@ -392,18 +365,62 @@ public class TestStandardProcessorNode { } } + @Test + public void testVerifyCanUpdateBundle() { + final ModifiesClasspathNoAnnotationProcessor processor = new ModifiesClasspathNoAnnotationProcessor(); + final StandardProcessorNode procNode = createProcessorNode(processor); + final BundleCoordinate existingCoordinate = procNode.getBundleCoordinate(); + + // should be allowed to update when the bundle is the same + procNode.verifyCanUpdateBundle(existingCoordinate); + + // should be allowed to update when the group and id are the same but version is different + final BundleCoordinate diffVersion = new BundleCoordinate(existingCoordinate.getGroup(), existingCoordinate.getId(), "v2"); + assertTrue(!existingCoordinate.getVersion().equals(diffVersion.getVersion())); + procNode.verifyCanUpdateBundle(diffVersion); + + // should not be allowed to update when the bundle id is different + final BundleCoordinate diffId = new BundleCoordinate(existingCoordinate.getGroup(), "different-id", existingCoordinate.getVersion()); + assertTrue(!existingCoordinate.getId().equals(diffId.getId())); + try { + procNode.verifyCanUpdateBundle(diffId); + Assert.fail("Should have thrown exception"); + } catch (Exception e) { + + } + + // should not be allowed to update when the bundle group is different + final BundleCoordinate diffGroup = new BundleCoordinate("different-group", existingCoordinate.getId(), existingCoordinate.getVersion()); + assertTrue(!existingCoordinate.getGroup().equals(diffGroup.getGroup())); + try { + procNode.verifyCanUpdateBundle(diffGroup); + Assert.fail("Should have thrown exception"); + } catch (Exception e) { + + } + } + + @Test + public void testValidateControllerServiceApiRequired() { + + } + private StandardProcessorNode createProcessorNode(Processor processor) { final String uuid = UUID.randomUUID().toString(); final ValidationContextFactory validationContextFactory = createValidationContextFactory(); - final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, null); + final NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties("src/test/resources/conf/nifi.properties", null); final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class); final ComponentLog componentLog = Mockito.mock(ComponentLog.class); + final Bundle systemBundle = ExtensionManager.createSystemBundle(niFiProperties); + ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + ExtensionManager.createInstanceClassLoader(processor.getClass().getName(), uuid, systemBundle); + ProcessorInitializationContext initContext = new StandardProcessorInitializationContext(uuid, componentLog, null, null, null); processor.initialize(initContext); - return new StandardProcessorNode(processor, uuid, validationContextFactory, processScheduler, null, - niFiProperties, variableRegistry, componentLog); + final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(processor, systemBundle.getBundleDetails().getCoordinate(), componentLog); + return new StandardProcessorNode(loggableComponent, uuid, validationContextFactory, processScheduler, null, niFiProperties, variableRegistry); } private boolean containsResource(URL[] resources, URL resourceToFind) { http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 46d96be..5c8d447 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -16,35 +16,13 @@ */ package org.apache.nifi.controller.scheduling; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import java.io.File; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; - import org.apache.commons.io.FileUtils; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -60,6 +38,7 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.events.VolatileBulletinRepository; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -75,6 +54,29 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + /** * Validate Processor's life-cycle operation within the context of * {@link FlowController} and {@link StandardProcessScheduler} @@ -100,11 +102,12 @@ public class TestProcessorLifecycle { @Test public void validateEnableOperation() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), - UUID.randomUUID().toString()); + UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); @@ -121,11 +124,13 @@ public class TestProcessorLifecycle { @Test public void validateDisableOperation() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), - UUID.randomUUID().toString()); + UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); @@ -147,10 +152,13 @@ public class TestProcessorLifecycle { */ @Test public void validateIdempotencyOfProcessorStartOperation() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); @@ -173,10 +181,12 @@ public class TestProcessorLifecycle { */ @Test public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); @@ -196,10 +206,12 @@ public class TestProcessorLifecycle { @Test @Ignore public void validateSuccessfullAndOrderlyShutdown() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); @@ -239,10 +251,13 @@ public class TestProcessorLifecycle { @Test @Ignore public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); @@ -295,10 +310,13 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcessorFullyStarted() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); @@ -331,10 +349,13 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledException() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); @@ -363,10 +384,13 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); @@ -394,10 +418,13 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterruptable() throws Exception { - this.fc = buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -422,10 +449,13 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninterruptable() throws Exception { - this.fc = buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT, "5 sec"); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); // sets the scenario for the processor to run @@ -455,10 +485,13 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); @@ -484,10 +517,13 @@ public class TestProcessorLifecycle { */ @Test(expected = IllegalStateException.class) public void validateStartFailsOnInvalidProcessorWithMissingProperty() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); fail(); @@ -499,12 +535,16 @@ public class TestProcessorLifecycle { */ @Test(expected = IllegalStateException.class) public void validateStartFailsOnInvalidProcessorWithDisabledService() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv", true); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "serv", + fcsb.getSystemBundle().getBundleDetails().getCoordinate(), true); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); properties.put("S", testServiceNode.getIdentifier()); testProcNode.setProperties(properties); @@ -522,14 +562,18 @@ public class TestProcessorLifecycle { */ @Test public void validateStartSucceedsOnProcessorWithEnabledService() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo", true); + ControllerServiceNode testServiceNode = fc.createControllerService(TestService.class.getName(), "foo", + fcsb.getSystemBundle().getBundleDetails().getCoordinate(), true); testGroup.addControllerService(testServiceNode); - ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testGroup.addProcessor(testProcNode); properties.put("S", testServiceNode.getIdentifier()); @@ -554,15 +598,19 @@ public class TestProcessorLifecycle { */ @Test public void validateProcessorDeletion() throws Exception { - fc = this.buildFlowControllerForTest(); + final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); + fc = fcsb.getFlowController(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); this.setControllerRootGroup(fc, testGroup); - ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNodeA.setProperties(properties); testGroup.addProcessor(testProcNodeA); - ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), + fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNodeB.setProperties(properties); testGroup.addProcessor(testProcNodeB); @@ -670,7 +718,7 @@ public class TestProcessorLifecycle { testProcessor.setScenario(emptyRunnable, emptyRunnable, emptyRunnable, emptyRunnable); } - private FlowController buildFlowControllerForTest(final String propKey, final String propValue) throws Exception { + private FlowControllerAndSystemBundle buildFlowControllerForTest(final String propKey, final String propValue) throws Exception { final Map<String, String> addProps = new HashMap<>(); addProps.put(NiFiProperties.ADMINISTRATIVE_YIELD_DURATION, "1 sec"); addProps.put(NiFiProperties.STATE_MANAGEMENT_CONFIG_FILE, "target/test-classes/state-management.xml"); @@ -682,12 +730,18 @@ public class TestProcessorLifecycle { addProps.put(propKey, propValue); } final NiFiProperties nifiProperties = NiFiProperties.createBasicNiFiProperties(null, addProps); - return FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties, + + final Bundle systemBundle = ExtensionManager.createSystemBundle(nifiProperties); + ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + + final FlowController flowController = FlowController.createStandaloneInstance(mock(FlowFileEventRepository.class), nifiProperties, mock(Authorizer.class), mock(AuditService.class), null, new VolatileBulletinRepository(), new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths())); + + return new FlowControllerAndSystemBundle(flowController, systemBundle); } - private FlowController buildFlowControllerForTest() throws Exception { + private FlowControllerAndSystemBundle buildFlowControllerForTest() throws Exception { return buildFlowControllerForTest(null, null); } @@ -705,6 +759,25 @@ public class TestProcessorLifecycle { } } + private static class FlowControllerAndSystemBundle { + + private final FlowController flowController; + private final Bundle systemBundle; + + public FlowControllerAndSystemBundle(FlowController flowController, Bundle systemBundle) { + this.flowController = flowController; + this.systemBundle = systemBundle; + } + + public FlowController getFlowController() { + return flowController; + } + + public Bundle getSystemBundle() { + return systemBundle; + } + } + /** */ public static class TestProcessor extends AbstractProcessor { http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index ee2b103..b69701e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -16,29 +16,16 @@ */ package org.apache.nifi.controller.scheduling; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.LoggableComponent; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; @@ -55,6 +42,7 @@ import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.controller.service.mock.MockProcessGroup; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -67,6 +55,7 @@ import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.NiFiProperties; import org.junit.Before; @@ -74,6 +63,23 @@ import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class TestStandardProcessScheduler { private StandardProcessScheduler scheduler = null; @@ -84,11 +90,17 @@ public class TestStandardProcessScheduler { private FlowController controller; private ProcessGroup rootGroup; private NiFiProperties nifiProperties; + private Bundle systemBundle; @Before public void setup() throws InitializationException { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessScheduler.class.getResource("/nifi.properties").getFile()); this.nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null); + + // load the system bundle + systemBundle = ExtensionManager.createSystemBundle(nifiProperties); + ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet()); + scheduler = new StandardProcessScheduler(Mockito.mock(ControllerServiceProvider.class), null, stateMgrProvider, variableRegistry, nifiProperties); scheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, Mockito.mock(SchedulingAgent.class)); @@ -99,7 +111,8 @@ public class TestStandardProcessScheduler { final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(null, variableRegistry); final ComponentLog logger = Mockito.mock(ComponentLog.class); - taskNode = new StandardReportingTaskNode(reportingTask, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry, logger); + final LoggableComponent<ReportingTask> loggableComponent = new LoggableComponent<>(reportingTask, systemBundle.getBundleDetails().getCoordinate(), logger); + taskNode = new StandardReportingTaskNode(loggableComponent, UUID.randomUUID().toString(), null, scheduler, validationContextFactory, variableRegistry); controller = Mockito.mock(FlowController.class); rootGroup = new MockProcessGroup(); @@ -139,13 +152,14 @@ public class TestStandardProcessScheduler { final StandardControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(controller, scheduler, null, Mockito.mock(StateManagerProvider.class), variableRegistry, nifiProperties); - final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true); + final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", + systemBundle.getBundleDetails().getCoordinate(), true); rootGroup.addControllerService(service); - final ProcessorNode procNode = new StandardProcessorNode(proc, uuid, + final LoggableComponent<Processor> loggableComponent = new LoggableComponent<>(proc, systemBundle.getBundleDetails().getCoordinate(), null); + final ProcessorNode procNode = new StandardProcessorNode(loggableComponent, uuid, new StandardValidationContextFactory(serviceProvider, variableRegistry), - scheduler, serviceProvider, nifiProperties, VariableRegistry.EMPTY_REGISTRY, - Mockito.mock(ComponentLog.class)); + scheduler, serviceProvider, nifiProperties, VariableRegistry.EMPTY_REGISTRY); rootGroup.addProcessor(procNode); Map<String,String> procProps = new HashMap<>(); @@ -219,7 +233,7 @@ public class TestStandardProcessScheduler { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), - "1", false); + "1", systemBundle.getBundleDetails().getCoordinate(), false); assertFalse(serviceNode.isActive()); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); final ExecutorService executor = Executors.newCachedThreadPool(); @@ -258,7 +272,7 @@ public class TestStandardProcessScheduler { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), - "1", false); + "1", systemBundle.getBundleDetails().getCoordinate(), false); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); final ExecutorService executor = Executors.newCachedThreadPool(); @@ -296,7 +310,7 @@ public class TestStandardProcessScheduler { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(SimpleTestService.class.getName(), - "1", false); + "1", systemBundle.getBundleDetails().getCoordinate(), false); final SimpleTestService ts = (SimpleTestService) serviceNode.getControllerServiceImplementation(); scheduler.enableControllerService(serviceNode); assertTrue(serviceNode.isActive()); @@ -330,7 +344,7 @@ public class TestStandardProcessScheduler { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(FailingService.class.getName(), - "1", false); + "1", systemBundle.getBundleDetails().getCoordinate(), false); scheduler.enableControllerService(serviceNode); Thread.sleep(1000); scheduler.shutdown(); @@ -363,8 +377,8 @@ public class TestStandardProcessScheduler { final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 200; i++) { - final ControllerServiceNode serviceNode = provider - .createControllerService(RandomShortDelayEnablingService.class.getName(), "1", false); + final ControllerServiceNode serviceNode = provider.createControllerService(RandomShortDelayEnablingService.class.getName(), "1", + systemBundle.getBundleDetails().getCoordinate(), false); executor.execute(new Runnable() { @Override @@ -405,7 +419,7 @@ public class TestStandardProcessScheduler { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), - "1", false); + "1", systemBundle.getBundleDetails().getCoordinate(), false); final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); ts.setLimit(Long.MAX_VALUE); scheduler.enableControllerService(serviceNode); @@ -431,7 +445,7 @@ public class TestStandardProcessScheduler { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(controller, scheduler, null, stateMgrProvider, variableRegistry, nifiProperties); final ControllerServiceNode serviceNode = provider.createControllerService(LongEnablingService.class.getName(), - "1", false); + "1", systemBundle.getBundleDetails().getCoordinate(), false); final LongEnablingService ts = (LongEnablingService) serviceNode.getControllerServiceImplementation(); ts.setLimit(3000); scheduler.enableControllerService(serviceNode); http://git-wip-us.apache.org/repos/asf/nifi/blob/d90cf846/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java index 0e4571a..15c35d9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.service; +import org.apache.nifi.bundle.Bundle; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ControllerService; @@ -30,19 +31,26 @@ import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; + public class StandardControllerServiceProviderTest { private ControllerService proxied; private ControllerService implementation; private static VariableRegistry variableRegistry; private static NiFiProperties nifiProperties; + private static Bundle systemBundle; @BeforeClass public static void setupSuite() throws Exception { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardControllerServiceProviderTest.class.getResource("/conf/nifi.properties").getFile()); nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null); + NarClassLoaders.getInstance().init(nifiProperties.getFrameworkWorkingDirectory(), nifiProperties.getExtensionsWorkingDirectory()); - ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders()); + + // load the system bundle + systemBundle = ExtensionManager.createSystemBundle(nifiProperties); + ExtensionManager.discoverExtensions(systemBundle, NarClassLoaders.getInstance().getBundles()); + variableRegistry = new FileBasedVariableRegistry(nifiProperties.getVariableRegistryPropertiesPaths()); } @@ -72,7 +80,7 @@ public class StandardControllerServiceProviderTest { public void onComponentRemoved(String componentId) { } }, variableRegistry, nifiProperties); - ControllerServiceNode node = provider.createControllerService(clazz, id, true); + ControllerServiceNode node = provider.createControllerService(clazz, id, systemBundle.getBundleDetails().getCoordinate(), true); proxied = node.getProxiedControllerService(); implementation = node.getControllerServiceImplementation(); }
