http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index ca68725..3486875 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -257,7 +257,7 @@ public class TestStandardProcessSession { Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); } } - + private void assertDisabled(final InputStream inputStream) { try { inputStream.read(); @@ -289,8 +289,8 @@ public class TestStandardProcessSession { } catch (final Exception ex) { Assert.assertEquals(FlowFileAccessException.class, ex.getClass()); } - } - + } + @Test public void testAppendAfterSessionClosesStream() throws IOException { final ContentClaim claim = contentRepo.create(false); @@ -355,7 +355,7 @@ public class TestStandardProcessSession { }); assertDisabled(inputStreamHolder.get()); assertDisabled(outputStreamHolder.get()); - } + } @Test public void testWriteAfterSessionClosesStream() throws IOException { @@ -426,7 +426,6 @@ public class TestStandardProcessSession { assertEquals(0, provenanceRepo.getEvents(0L, 100000).size()); } - @Test public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -466,59 +465,59 @@ public class TestStandardProcessSession { @Test public void testUpdateAttributesThenJoin() throws IOException { final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") - .entryDate(System.currentTimeMillis()) - .build(); - + .id(1L) + .addAttribute("uuid", "11111111-1111-1111-1111-111111111111") + .entryDate(System.currentTimeMillis()) + .build(); + final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .id(2L) - .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") - .entryDate(System.currentTimeMillis()) - .build(); - + .id(2L) + .addAttribute("uuid", "22222222-2222-2222-2222-222222222222") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord1); flowFileQueue.put(flowFileRecord2); - + FlowFile ff1 = session.get(); FlowFile ff2 = session.get(); ff1 = session.putAttribute(ff1, "index", "1"); ff2 = session.putAttribute(ff2, "index", "2"); - + final List<FlowFile> parents = new ArrayList<>(2); parents.add(ff1); parents.add(ff2); - + final FlowFile child = session.create(parents); - + final Relationship rel = new Relationship.Builder().name("A").build(); - + session.transfer(ff1, rel); session.transfer(ff2, rel); session.transfer(child, rel); - + session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000); // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's assertEquals(3, events.size()); - + int joinCount = 0; int ff1UpdateCount = 0; int ff2UpdateCount = 0; - - for ( final ProvenanceEventRecord event : events ) { + + for (final ProvenanceEventRecord event : events) { switch (event.getEventType()) { case JOIN: assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid()); joinCount++; break; case ATTRIBUTES_MODIFIED: - if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) { + if (event.getFlowFileUuid().equals(ff1.getAttribute("uuid"))) { ff1UpdateCount++; - } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) { + } else if (event.getFlowFileUuid().equals(ff2.getAttribute("uuid"))) { ff2UpdateCount++; } else { Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid()); @@ -528,14 +527,14 @@ public class TestStandardProcessSession { Assert.fail("Unexpected event type: " + event); } } - + assertEquals(1, joinCount); assertEquals(1, ff1UpdateCount); assertEquals(1, ff2UpdateCount); - + assertEquals(1, joinCount); } - + @Test public void testForkOneToOneReported() throws IOException { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() @@ -845,34 +844,34 @@ public class TestStandardProcessSession { @Test public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }).build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }).build(); flowFileQueue.put(flowFileRecord); FlowFile ff1 = session.get(); @@ -885,35 +884,35 @@ public class TestStandardProcessSession { session.commit(); final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() - .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") - .entryDate(System.currentTimeMillis()) - .contentClaim(new ContentClaim() { - @Override - public int compareTo(ContentClaim arg0) { - return 0; - } - - @Override - public String getId() { - return "0"; - } - - @Override - public String getContainer() { - return "container"; - } - - @Override - public String getSection() { - return "section"; - } - - @Override - public boolean isLossTolerant() { - return true; - } - }) - .contentClaimOffset(1000L).size(1L).build(); + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new ContentClaim() { + @Override + public int compareTo(ContentClaim arg0) { + return 0; + } + + @Override + public String getId() { + return "0"; + } + + @Override + public String getContainer() { + return "container"; + } + + @Override + public String getSection() { + return "section"; + } + + @Override + public boolean isLossTolerant() { + return true; + } + }) + .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); // attempt to read the data. @@ -974,21 +973,20 @@ public class TestStandardProcessSession { } } - @Test public void testCreateEmitted() throws IOException { FlowFile newFlowFile = session.create(); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CREATE, event.getEventType()); } - + @Test public void testContentModifiedNotEmittedForCreate() throws IOException { FlowFile newFlowFile = session.create(); @@ -999,23 +997,23 @@ public class TestStandardProcessSession { }); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CREATE, event.getEventType()); } - + @Test public void testContentModifiedEmittedAndNotAttributesModified() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") - .build(); + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); this.flowFileQueue.put(flowFile); - + FlowFile existingFlowFile = session.get(); existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() { @Override @@ -1025,38 +1023,36 @@ public class TestStandardProcessSession { existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType()); } - + @Test public void testAttributesModifiedEmitted() throws IOException { final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder() - .id(1L) - .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") - .build(); + .id(1L) + .addAttribute("uuid", "000000000000-0000-0000-0000-00000000") + .build(); this.flowFileQueue.put(flowFile); - + FlowFile existingFlowFile = session.get(); existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a"); session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - + final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); - + final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType()); } - - - + private static class MockFlowFileRepository implements FlowFileRepository { private final AtomicLong idGenerator = new AtomicLong(0L); @@ -1123,7 +1119,7 @@ public class TestStandardProcessSession { @Override public void shutdown() { } - + public Set<ContentClaim> getExistingClaims() { final Set<ContentClaim> claims = new HashSet<>(); @@ -1146,7 +1142,7 @@ public class TestStandardProcessSession { if (Files.exists(parent) == false) { Files.createDirectories(parent); } - Files.createFile(getPath(claim)); + Files.createFile(getPath(claim)); return claim; }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java index 7fef706..acd9993 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java @@ -27,45 +27,45 @@ import org.junit.BeforeClass; import org.junit.Test; public class StandardControllerServiceProviderTest { - + private ControllerService proxied; private ControllerService implementation; - + @BeforeClass public static void setupSuite() throws Exception { System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardFlowServiceTest.class.getResource("/conf/nifi.properties").getFile()); NiFiProperties properties = NiFiProperties.getInstance(); NarClassLoaders.load(properties); - ExtensionManager.discoverExtensions(); + ExtensionManager.discoverExtensions(); } @Before public void setup() throws Exception { - String id = "id"; - String clazz = "org.apache.nifi.controller.service.util.TestControllerService"; - ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null); - ControllerServiceNode node = provider.createControllerService(clazz,id,true); - proxied = node.getProxiedControllerService(); - implementation = node.getControllerServiceImplementation(); + String id = "id"; + String clazz = "org.apache.nifi.controller.service.util.TestControllerService"; + ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null); + ControllerServiceNode node = provider.createControllerService(clazz, id, true); + proxied = node.getProxiedControllerService(); + implementation = node.getControllerServiceImplementation(); } - - @Test (expected=UnsupportedOperationException.class) - public void testCallProxiedOnPropertyModified() { - proxied.onPropertyModified(null, "oldValue", "newValue"); + + @Test(expected = UnsupportedOperationException.class) + public void testCallProxiedOnPropertyModified() { + proxied.onPropertyModified(null, "oldValue", "newValue"); } - + @Test - public void testCallImplementationOnPropertyModified() { - implementation.onPropertyModified(null, "oldValue", "newValue"); + public void testCallImplementationOnPropertyModified() { + implementation.onPropertyModified(null, "oldValue", "newValue"); } - - @Test (expected=UnsupportedOperationException.class) - public void testCallProxiedInitialized() throws InitializationException { - proxied.initialize(null); + + @Test(expected = UnsupportedOperationException.class) + public void testCallProxiedInitialized() throws InitializationException { + proxied.initialize(null); } - + @Test - public void testCallImplementationInitialized() throws InitializationException { - implementation.initialize(null); + public void testCallImplementationInitialized() throws InitializationException { + implementation.initialize(null); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java index 3dc1752..03aca7e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java @@ -54,7 +54,7 @@ public class TestStandardControllerServiceProvider { return null; } }).when(scheduler).enableControllerService(Mockito.any(ControllerServiceNode.class)); - + Mockito.doAnswer(new Answer<Object>() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { @@ -64,55 +64,54 @@ public class TestStandardControllerServiceProvider { return null; } }).when(scheduler).disableControllerService(Mockito.any(ControllerServiceNode.class)); - + return scheduler; } - + @Test public void testDisableControllerService() { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); - + final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false); provider.enableControllerService(serviceNode); provider.disableControllerService(serviceNode); } - + @Test public void testEnableDisableWithReference() { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); - + final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false); final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false); - + serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B"); - + try { provider.enableControllerService(serviceNodeA); Assert.fail("Was able to enable Service A but Service B is disabled."); } catch (final IllegalStateException expected) { } - + provider.enableControllerService(serviceNodeB); provider.enableControllerService(serviceNodeA); - + try { provider.disableControllerService(serviceNodeB); Assert.fail("Was able to disable Service B but Service A is enabled and references B"); } catch (final IllegalStateException expected) { } - + provider.disableControllerService(serviceNodeA); provider.disableControllerService(serviceNodeB); } - - + @Test public void testEnableReferencingServicesGraph() { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); - + // build a graph of controller services with dependencies as such: // // A -> B -> D @@ -125,31 +124,29 @@ public class TestStandardControllerServiceProvider { // So we have to verify that if D is enabled, when we enable its referencing services, // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so // until B is first enabled so ensure that we enable B first. - final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false); final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); - + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); - + provider.enableControllerService(serviceNode4); provider.enableReferencingServices(serviceNode4); - + assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState()); assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState()); assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState()); } - - + @Test public void testStartStopReferencingComponents() { final ProcessScheduler scheduler = createScheduler(); final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null); - + // build a graph of reporting tasks and controller services with dependencies as such: // // Processor P1 -> A -> B -> D @@ -162,12 +159,11 @@ public class TestStandardControllerServiceProvider { // So we have to verify that if D is enabled, when we enable its referencing services, // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so // until B is first enabled so ensure that we enable B first. - final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false); final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false); final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false); final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false); - + final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class); Mockito.doAnswer(new Answer<Object>() { @Override @@ -178,7 +174,7 @@ public class TestStandardControllerServiceProvider { return null; } }).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class)); - + Mockito.doAnswer(new Answer<Object>() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { @@ -188,36 +184,36 @@ public class TestStandardControllerServiceProvider { return null; } }).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class)); - + final String id1 = UUID.randomUUID().toString(); final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1, new StandardValidationContextFactory(provider), scheduler, provider); procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider)); procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1"); procNodeA.setProcessGroup(mockProcessGroup); - + final String id2 = UUID.randomUUID().toString(); - final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(),id2, + final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(), id2, new StandardValidationContextFactory(provider), scheduler, provider); procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider)); procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3"); procNodeB.setProcessGroup(mockProcessGroup); - + serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4"); serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4"); - + provider.enableControllerService(serviceNode4); provider.enableReferencingServices(serviceNode4); provider.scheduleReferencingComponents(serviceNode4); - + assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState()); assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState()); assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState()); assertTrue(procNodeA.isRunning()); assertTrue(procNodeB.isRunning()); - + // stop processors and verify results. provider.unscheduleReferencingComponents(serviceNode4); assertFalse(procNodeA.isRunning()); @@ -225,18 +221,17 @@ public class TestStandardControllerServiceProvider { assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState()); assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState()); assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState()); - + provider.disableReferencingServices(serviceNode4); assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState()); assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState()); assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState()); assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState()); - + provider.disableControllerService(serviceNode4); assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState()); } - - + @Test public void testOrderingOfServices() { final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null); @@ -248,7 +243,7 @@ public class TestStandardControllerServiceProvider { final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>(); nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); - + List<List<ControllerServiceNode>> branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); assertEquals(2, branches.size()); List<ControllerServiceNode> ordered = branches.get(0); @@ -257,11 +252,11 @@ public class TestStandardControllerServiceProvider { assertTrue(ordered.get(1) == serviceNode1); assertEquals(1, branches.get(1).size()); assertTrue(branches.get(1).get(0) == serviceNode2); - + nodeMap.clear(); nodeMap.put("2", serviceNode2); nodeMap.put("1", serviceNode1); - + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); assertEquals(2, branches.size()); ordered = branches.get(1); @@ -270,20 +265,20 @@ public class TestStandardControllerServiceProvider { assertTrue(ordered.get(1) == serviceNode1); assertEquals(1, branches.get(0).size()); assertTrue(branches.get(0).get(0) == serviceNode2); - + // add circular dependency on self. nodeMap.clear(); serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1"); nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); - + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); assertEquals(2, branches.size()); ordered = branches.get(0); assertEquals(2, ordered.size()); assertTrue(ordered.get(0) == serviceNode2); assertTrue(ordered.get(1) == serviceNode1); - + nodeMap.clear(); nodeMap.put("2", serviceNode2); nodeMap.put("1", serviceNode1); @@ -293,7 +288,7 @@ public class TestStandardControllerServiceProvider { assertEquals(2, ordered.size()); assertTrue(ordered.get(0) == serviceNode2); assertTrue(ordered.get(1) == serviceNode1); - + // add circular dependency once removed. In this case, we won't actually be able to enable these because of the // circular dependency because they will never be valid because they will always depend on a disabled service. // But we want to ensure that the method returns successfully without throwing a StackOverflowException or anything @@ -310,7 +305,7 @@ public class TestStandardControllerServiceProvider { assertEquals(2, ordered.size()); assertTrue(ordered.get(0) == serviceNode3); assertTrue(ordered.get(1) == serviceNode1); - + nodeMap.clear(); nodeMap.put("3", serviceNode3); nodeMap.put("1", serviceNode1); @@ -320,8 +315,7 @@ public class TestStandardControllerServiceProvider { assertEquals(2, ordered.size()); assertTrue(ordered.get(0) == serviceNode3); assertTrue(ordered.get(1) == serviceNode1); - - + // Add multiple completely disparate branches. nodeMap.clear(); serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); @@ -333,7 +327,7 @@ public class TestStandardControllerServiceProvider { nodeMap.put("3", serviceNode3); nodeMap.put("4", serviceNode4); nodeMap.put("5", serviceNode5); - + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); assertEquals(5, branches.size()); @@ -341,21 +335,21 @@ public class TestStandardControllerServiceProvider { assertEquals(2, ordered.size()); assertTrue(ordered.get(0) == serviceNode2); assertTrue(ordered.get(1) == serviceNode1); - + assertEquals(1, branches.get(1).size()); assertTrue(branches.get(1).get(0) == serviceNode2); - + ordered = branches.get(2); assertEquals(2, ordered.size()); assertTrue(ordered.get(0) == serviceNode4); assertTrue(ordered.get(1) == serviceNode3); - + assertEquals(1, branches.get(3).size()); assertTrue(branches.get(3).get(0) == serviceNode4); - + assertEquals(1, branches.get(4).size()); assertTrue(branches.get(4).get(0) == serviceNode5); - + // create 2 branches both dependent on the same service nodeMap.clear(); serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2"); @@ -363,19 +357,19 @@ public class TestStandardControllerServiceProvider { nodeMap.put("1", serviceNode1); nodeMap.put("2", serviceNode2); nodeMap.put("3", serviceNode3); - + branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap); assertEquals(3, branches.size()); - + ordered = branches.get(0); assertEquals(2, ordered.size()); assertTrue(ordered.get(0) == serviceNode2); assertTrue(ordered.get(1) == serviceNode1); - + ordered = branches.get(1); assertEquals(1, ordered.size()); assertTrue(ordered.get(0) == serviceNode2); - + ordered = branches.get(2); assertEquals(2, ordered.size()); assertTrue(ordered.get(0) == serviceNode2); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java index 615e172..13898a5 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java @@ -29,19 +29,18 @@ import org.apache.nifi.processor.exception.ProcessException; public class DummyProcessor extends AbstractProcessor { public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder() - .name("Controller Service") - .identifiesControllerService(ControllerService.class) - .required(true) - .build(); - - + .name("Controller Service") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> descriptors = new ArrayList<>(); descriptors.add(SERVICE); return descriptors; } - + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java index 4918468..f93184b 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java @@ -26,18 +26,17 @@ import org.apache.nifi.controller.ControllerService; public class ServiceA extends AbstractControllerService { public static final PropertyDescriptor OTHER_SERVICE = new PropertyDescriptor.Builder() - .name("Other Service") - .identifiesControllerService(ControllerService.class) - .required(true) - .build(); - + .name("Other Service") + .identifiesControllerService(ControllerService.class) + .required(true) + .build(); + public static final PropertyDescriptor OTHER_SERVICE_2 = new PropertyDescriptor.Builder() - .name("Other Service 2") - .identifiesControllerService(ControllerService.class) - .required(false) - .build(); + .name("Other Service 2") + .identifiesControllerService(ControllerService.class) + .required(false) + .build(); - @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> descriptors = new ArrayList<>(); @@ -45,5 +44,5 @@ public class ServiceA extends AbstractControllerService { descriptors.add(OTHER_SERVICE_2); return descriptors; } - + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java index 95200a0..65ef13f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java @@ -28,34 +28,34 @@ import org.apache.nifi.reporting.InitializationException; public class TestControllerService implements ControllerService { - @Override - public Collection<ValidationResult> validate(ValidationContext context) { - return null; - } - - @Override - public PropertyDescriptor getPropertyDescriptor(String name) { - return null; - } - - @Override - public void onPropertyModified(PropertyDescriptor descriptor, - String oldValue, String newValue) { - } - - @Override - public List<PropertyDescriptor> getPropertyDescriptors() { - return null; - } - - @Override - public String getIdentifier() { - return null; - } - - @Override - public void initialize(ControllerServiceInitializationContext context) - throws InitializationException { - } + @Override + public Collection<ValidationResult> validate(ValidationContext context) { + return null; + } + + @Override + public PropertyDescriptor getPropertyDescriptor(String name) { + return null; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, + String oldValue, String newValue) { + } + + @Override + public List<PropertyDescriptor> getPropertyDescriptors() { + return null; + } + + @Override + public String getIdentifier() { + return null; + } + + @Override + public void initialize(ControllerServiceInitializationContext context) + throws InitializationException { + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java index a0bf30d..be40e90 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java @@ -165,9 +165,9 @@ public class TestStandardPropertyValue { @Override public String getControllerServiceName(String serviceIdentifier) { - return null; + return null; } - + @Override public boolean isControllerServiceEnabling(String serviceIdentifier) { return false; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml index 46a1aca..0406ed6 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml @@ -32,7 +32,7 @@ <artifactId>nifi-properties</artifactId> <scope>compile</scope> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-documentation</artifactId> <scope>compile</scope> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java index 590797c..c1bdf97 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java @@ -47,166 +47,165 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BootstrapListener { - private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class); - - private final NiFi nifi; - private final int bootstrapPort; - private final String secretKey; - - private volatile Listener listener; - private volatile ServerSocket serverSocket; - - - public BootstrapListener(final NiFi nifi, final int bootstrapPort) { - this.nifi = nifi; - this.bootstrapPort = bootstrapPort; - secretKey = UUID.randomUUID().toString(); - } - - public void start() throws IOException { - logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort); - - serverSocket = new ServerSocket(); - serverSocket.bind(new InetSocketAddress("localhost", 0)); - serverSocket.setSoTimeout(2000); - - final int localPort = serverSocket.getLocalPort(); - logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort); - - listener = new Listener(serverSocket); - final Thread listenThread = new Thread(listener); - listenThread.setDaemon(true); - listenThread.setName("Listen to Bootstrap"); - listenThread.start(); - - logger.debug("Notifying Bootstrap that local port is {}", localPort); - try (final Socket socket = new Socket()) { - socket.setSoTimeout(60000); - socket.connect(new InetSocketAddress("localhost", bootstrapPort)); - socket.setSoTimeout(60000); - - final OutputStream out = socket.getOutputStream(); - out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); - out.flush(); - - logger.debug("Awaiting response from Bootstrap..."); - final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); - final String response = reader.readLine(); - if ("OK".equals(response)) { - logger.info("Successfully initiated communication with Bootstrap"); - } else { - logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi"); - } - } - } - - - public void stop() { - if (listener != null) { - listener.stop(); - } - } - - private class Listener implements Runnable { - private final ServerSocket serverSocket; - private final ExecutorService executor; - private volatile boolean stopped = false; - - public Listener(final ServerSocket serverSocket) { - this.serverSocket = serverSocket; - this.executor = Executors.newFixedThreadPool(2); - } - - public void stop() { - stopped = true; - - executor.shutdownNow(); - - try { - serverSocket.close(); - } catch (final IOException ioe) { - // nothing to really do here. we could log this, but it would just become - // confusing in the logs, as we're shutting down and there's no real benefit - } - } - - @Override - public void run() { - while (!stopped) { - try { - final Socket socket; - try { - logger.debug("Listening for Bootstrap Requests"); - socket = serverSocket.accept(); - } catch (final SocketTimeoutException ste) { - if ( stopped ) { - return; - } - - continue; - } catch (final IOException ioe) { - if ( stopped ) { - return; - } - - throw ioe; - } - - logger.debug("Received connection from Bootstrap"); - socket.setSoTimeout(5000); - - executor.submit(new Runnable() { - @Override - public void run() { - try { - final BootstrapRequest request = readRequest(socket.getInputStream()); - final BootstrapRequest.RequestType requestType = request.getRequestType(); - - switch (requestType) { - case PING: - logger.debug("Received PING request from Bootstrap; responding"); - echoPing(socket.getOutputStream()); - logger.debug("Responded to PING request from Bootstrap"); - break; - case SHUTDOWN: - logger.info("Received SHUTDOWN request from Bootstrap"); - echoShutdown(socket.getOutputStream()); - nifi.shutdownHook(); - return; - case DUMP: - logger.info("Received DUMP request from Bootstrap"); - writeDump(socket.getOutputStream()); - break; - } - } catch (final Throwable t) { - logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); - } finally { - try { - socket.close(); - } catch (final IOException ioe) { - logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString()); - } - } - } - }); - } catch (final Throwable t) { - logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); - } - } - } - } - - - private static void writeDump(final OutputStream out) throws IOException { + + private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class); + + private final NiFi nifi; + private final int bootstrapPort; + private final String secretKey; + + private volatile Listener listener; + private volatile ServerSocket serverSocket; + + public BootstrapListener(final NiFi nifi, final int bootstrapPort) { + this.nifi = nifi; + this.bootstrapPort = bootstrapPort; + secretKey = UUID.randomUUID().toString(); + } + + public void start() throws IOException { + logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort); + + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("localhost", 0)); + serverSocket.setSoTimeout(2000); + + final int localPort = serverSocket.getLocalPort(); + logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort); + + listener = new Listener(serverSocket); + final Thread listenThread = new Thread(listener); + listenThread.setDaemon(true); + listenThread.setName("Listen to Bootstrap"); + listenThread.start(); + + logger.debug("Notifying Bootstrap that local port is {}", localPort); + try (final Socket socket = new Socket()) { + socket.setSoTimeout(60000); + socket.connect(new InetSocketAddress("localhost", bootstrapPort)); + socket.setSoTimeout(60000); + + final OutputStream out = socket.getOutputStream(); + out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8)); + out.flush(); + + logger.debug("Awaiting response from Bootstrap..."); + final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + final String response = reader.readLine(); + if ("OK".equals(response)) { + logger.info("Successfully initiated communication with Bootstrap"); + } else { + logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi"); + } + } + } + + public void stop() { + if (listener != null) { + listener.stop(); + } + } + + private class Listener implements Runnable { + + private final ServerSocket serverSocket; + private final ExecutorService executor; + private volatile boolean stopped = false; + + public Listener(final ServerSocket serverSocket) { + this.serverSocket = serverSocket; + this.executor = Executors.newFixedThreadPool(2); + } + + public void stop() { + stopped = true; + + executor.shutdownNow(); + + try { + serverSocket.close(); + } catch (final IOException ioe) { + // nothing to really do here. we could log this, but it would just become + // confusing in the logs, as we're shutting down and there's no real benefit + } + } + + @Override + public void run() { + while (!stopped) { + try { + final Socket socket; + try { + logger.debug("Listening for Bootstrap Requests"); + socket = serverSocket.accept(); + } catch (final SocketTimeoutException ste) { + if (stopped) { + return; + } + + continue; + } catch (final IOException ioe) { + if (stopped) { + return; + } + + throw ioe; + } + + logger.debug("Received connection from Bootstrap"); + socket.setSoTimeout(5000); + + executor.submit(new Runnable() { + @Override + public void run() { + try { + final BootstrapRequest request = readRequest(socket.getInputStream()); + final BootstrapRequest.RequestType requestType = request.getRequestType(); + + switch (requestType) { + case PING: + logger.debug("Received PING request from Bootstrap; responding"); + echoPing(socket.getOutputStream()); + logger.debug("Responded to PING request from Bootstrap"); + break; + case SHUTDOWN: + logger.info("Received SHUTDOWN request from Bootstrap"); + echoShutdown(socket.getOutputStream()); + nifi.shutdownHook(); + return; + case DUMP: + logger.info("Received DUMP request from Bootstrap"); + writeDump(socket.getOutputStream()); + break; + } + } catch (final Throwable t) { + logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); + } finally { + try { + socket.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString()); + } + } + } + }); + } catch (final Throwable t) { + logger.error("Failed to process request from Bootstrap due to " + t.toString(), t); + } + } + } + } + + private static void writeDump(final OutputStream out) throws IOException { final ThreadMXBean mbean = ManagementFactory.getThreadMXBean(); final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out)); - + final ThreadInfo[] infos = mbean.dumpAllThreads(true, true); final long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads(); - + final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length); - for ( final ThreadInfo info : infos ) { + for (final ThreadInfo info : infos) { sortedInfos.add(info); } Collections.sort(sortedInfos, new Comparator<ThreadInfo>() { @@ -215,14 +214,14 @@ public class BootstrapListener { return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase()); } }); - + final StringBuilder sb = new StringBuilder(); - for ( final ThreadInfo info : sortedInfos ) { + for (final ThreadInfo info : sortedInfos) { sb.append("\n"); sb.append("\"").append(info.getThreadName()).append("\" Id="); sb.append(info.getThreadId()).append(" "); sb.append(info.getThreadState().toString()).append(" "); - + switch (info.getThreadState()) { case BLOCKED: case TIMED_WAITING: @@ -233,66 +232,66 @@ public class BootstrapListener { default: break; } - + if (info.isSuspended()) { sb.append(" (suspended)"); } - if ( info.isInNative() ) { + if (info.isInNative()) { sb.append(" (in native code)"); } - - if ( deadlockedThreadIds != null && deadlockedThreadIds.length > 0 ) { - for ( final long id : deadlockedThreadIds ) { - if ( id == info.getThreadId() ) { + + if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) { + for (final long id : deadlockedThreadIds) { + if (id == info.getThreadId()) { sb.append(" ** DEADLOCKED THREAD **"); } } } - if ( monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0 ) { - for ( final long id : monitorDeadlockThreadIds ) { - if ( id == info.getThreadId() ) { + if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) { + for (final long id : monitorDeadlockThreadIds) { + if (id == info.getThreadId()) { sb.append(" ** MONITOR-DEADLOCKED THREAD **"); } } } final StackTraceElement[] stackTraces = info.getStackTrace(); - for ( final StackTraceElement element : stackTraces ) { + for (final StackTraceElement element : stackTraces) { sb.append("\n\tat ").append(element); - + final MonitorInfo[] monitors = info.getLockedMonitors(); - for ( final MonitorInfo monitor : monitors ) { - if ( monitor.getLockedStackFrame().equals(element) ) { + for (final MonitorInfo monitor : monitors) { + if (monitor.getLockedStackFrame().equals(element)) { sb.append("\n\t- waiting on ").append(monitor); } } } - + final LockInfo[] lockInfos = info.getLockedSynchronizers(); - if ( lockInfos.length > 0 ) { + if (lockInfos.length > 0) { sb.append("\n\t"); sb.append("Number of Locked Synchronizers: ").append(lockInfos.length); - for ( final LockInfo lockInfo : lockInfos ) { + for (final LockInfo lockInfo : lockInfos) { sb.append("\n\t- ").append(lockInfo.toString()); } } - + sb.append("\n"); } - + if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) { sb.append("\n\nDEADLOCK DETECTED!"); sb.append("\nThe following thread IDs are deadlocked:"); - for ( final long id : deadlockedThreadIds ) { + for (final long id : deadlockedThreadIds) { sb.append("\n").append(id); } } - if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) { + if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) { sb.append("\n\nMONITOR DEADLOCK DETECTED!"); sb.append("\nThe following thread IDs are deadlocked:"); - for ( final long id : monitorDeadlockThreadIds ) { + for (final long id : monitorDeadlockThreadIds) { sb.append("\n").append(id); } } @@ -300,79 +299,79 @@ public class BootstrapListener { writer.write(sb.toString()); writer.flush(); } - - private void echoPing(final OutputStream out) throws IOException { - out.write("PING\n".getBytes(StandardCharsets.UTF_8)); - out.flush(); - } - - private void echoShutdown(final OutputStream out) throws IOException { - out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8)); - out.flush(); - } - - - @SuppressWarnings("resource") // we don't want to close the stream, as the caller will do that + + private void echoPing(final OutputStream out) throws IOException { + out.write("PING\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + + private void echoShutdown(final OutputStream out) throws IOException { + out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8)); + out.flush(); + } + + @SuppressWarnings("resource") // we don't want to close the stream, as the caller will do that private BootstrapRequest readRequest(final InputStream in) throws IOException { - // We want to ensure that we don't try to read data from an InputStream directly - // by a BufferedReader because any user on the system could open a socket and send - // a multi-gigabyte file without any new lines in order to crash the NiFi instance - // (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance). - // So we will limit the Input Stream to only 4 KB, which should be plenty for any request. - final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096); - final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn)); - - final String line = reader.readLine(); - final String[] splits = line.split(" "); - if ( splits.length < 1 ) { - throw new IOException("Received invalid request from Bootstrap: " + line); - } - - final String requestType = splits[0]; - final String[] args; - if ( splits.length == 1 ) { - throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType); - } else if ( splits.length == 2 ) { - args = new String[0]; - } else { - args = Arrays.copyOfRange(splits, 2, splits.length); - } - - final String requestKey = splits[1]; - if ( !secretKey.equals(requestKey) ) { - throw new IOException("Received invalid Secret Key for request type " + requestType); - } - - try { - return new BootstrapRequest(requestType, args); - } catch (final Exception e) { - throw new IOException("Received invalid request from Bootstrap; request type = " + requestType); - } - } - - - private static class BootstrapRequest { - public static enum RequestType { - SHUTDOWN, - DUMP, - PING; - } - - private final RequestType requestType; - private final String[] args; - - public BootstrapRequest(final String request, final String[] args) { - this.requestType = RequestType.valueOf(request); - this.args = args; - } - - public RequestType getRequestType() { - return requestType; - } - - @SuppressWarnings("unused") + // We want to ensure that we don't try to read data from an InputStream directly + // by a BufferedReader because any user on the system could open a socket and send + // a multi-gigabyte file without any new lines in order to crash the NiFi instance + // (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance). + // So we will limit the Input Stream to only 4 KB, which should be plenty for any request. + final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096); + final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn)); + + final String line = reader.readLine(); + final String[] splits = line.split(" "); + if (splits.length < 1) { + throw new IOException("Received invalid request from Bootstrap: " + line); + } + + final String requestType = splits[0]; + final String[] args; + if (splits.length == 1) { + throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType); + } else if (splits.length == 2) { + args = new String[0]; + } else { + args = Arrays.copyOfRange(splits, 2, splits.length); + } + + final String requestKey = splits[1]; + if (!secretKey.equals(requestKey)) { + throw new IOException("Received invalid Secret Key for request type " + requestType); + } + + try { + return new BootstrapRequest(requestType, args); + } catch (final Exception e) { + throw new IOException("Received invalid request from Bootstrap; request type = " + requestType); + } + } + + private static class BootstrapRequest { + + public static enum RequestType { + + SHUTDOWN, + DUMP, + PING; + } + + private final RequestType requestType; + private final String[] args; + + public BootstrapRequest(final String request, final String[] args) { + this.requestType = RequestType.valueOf(request); + this.args = args; + } + + public RequestType getRequestType() { + return requestType; + } + + @SuppressWarnings("unused") public String[] getArgs() { - return args; - } - } + return args; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java index e166f8e..ef2377f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java @@ -47,11 +47,12 @@ public class NiFi { private static final Logger logger = LoggerFactory.getLogger(NiFi.class); private final NiFiServer nifiServer; private final BootstrapListener bootstrapListener; - + public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port"; private volatile boolean shutdown = false; - public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { + public NiFi(final NiFiProperties properties) + throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(final Thread t, final Throwable e) { @@ -70,24 +71,24 @@ public class NiFi { })); final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY); - if ( bootstrapPort != null ) { - try { - final int port = Integer.parseInt(bootstrapPort); - - if (port < 1 || port > 65535) { - throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); - } - - bootstrapListener = new BootstrapListener(this, port); - bootstrapListener.start(); - } catch (final NumberFormatException nfe) { - throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); - } + if (bootstrapPort != null) { + try { + final int port = Integer.parseInt(bootstrapPort); + + if (port < 1 || port > 65535) { + throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); + } + + bootstrapListener = new BootstrapListener(this, port); + bootstrapListener.start(); + } catch (final NumberFormatException nfe) { + throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535"); + } } else { - logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap"); - bootstrapListener = null; + logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap"); + bootstrapListener = null; } - + // delete the web working dir - if the application does not start successfully // the web app directories might be in an invalid state. when this happens // jetty will not attempt to re-extract the war into the directory. by removing @@ -118,7 +119,7 @@ public class NiFi { // discover the extensions ExtensionManager.discoverExtensions(); ExtensionManager.logClassLoaderMapping(); - + DocGenerator.generate(properties); // load the server from the framework classloader @@ -129,27 +130,27 @@ public class NiFi { final long startTime = System.nanoTime(); nifiServer = (NiFiServer) jettyConstructor.newInstance(properties); nifiServer.setExtensionMapping(extensionMapping); - - if ( shutdown ) { - logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller"); + + if (shutdown) { + logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller"); } else { - nifiServer.start(); - - final long endTime = System.nanoTime(); - logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds."); + nifiServer.start(); + + final long endTime = System.nanoTime(); + logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds."); } } protected void shutdownHook() { try { - this.shutdown = true; - + this.shutdown = true; + logger.info("Initiating shutdown of Jetty web server..."); if (nifiServer != null) { nifiServer.stop(); } if (bootstrapListener != null) { - bootstrapListener.stop(); + bootstrapListener.stop(); } logger.info("Jetty web server shutdown completed (nicely or otherwise)."); } catch (final Throwable t) { @@ -164,10 +165,10 @@ public class NiFi { final int minRequiredOccurrences = 25; final int maxOccurrencesOutOfRange = 15; final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis()); - + final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() { private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); - + @Override public Thread newThread(final Runnable r) { final Thread t = defaultFactory.newThread(r); @@ -176,7 +177,7 @@ public class NiFi { return t; } }); - + final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0); final AtomicInteger occurences = new AtomicInteger(0); final Runnable command = new Runnable() { @@ -202,7 +203,8 @@ public class NiFi { service.shutdownNow(); if (occurences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) { - logger.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause Processors to be scheduled erratically. Please see the NiFi documentation for more information."); + logger.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause " + + "Processors to be scheduled erratically. Please see the NiFi documentation for more information."); } } }; @@ -213,7 +215,7 @@ public class NiFi { /** * Main entry point of the application. * - * @param args + * @param args things which are ignored */ public static void main(String[] args) { logger.info("Launching NiFi...");
