Repository: nifi Updated Branches: refs/heads/master af2513adf -> 23937835f
NIFI-5092 - Removed local state management for S2S Bulletins RT Signed-off-by: Matthew Burgess <[email protected]> This closes #2643 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/23937835 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/23937835 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/23937835 Branch: refs/heads/master Commit: 23937835f30ffa4082360f71ce0526d794f514bd Parents: af2513a Author: Pierre Villard <[email protected]> Authored: Wed Apr 18 12:21:04 2018 +0200 Committer: Matthew Burgess <[email protected]> Committed: Wed Apr 25 16:25:00 2018 -0400 ---------------------------------------------------------------------- .../SiteToSiteBulletinReportingTask.java | 24 ------ .../TestSiteToSiteBulletinReportingTask.java | 77 -------------------- 2 files changed, 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/23937835/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index 20ed96a..d026aa1 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -19,13 +19,11 @@ package org.apache.nifi.reporting; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; -import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; -import org.apache.nifi.components.state.Scope; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; @@ -57,7 +55,6 @@ import java.util.concurrent.TimeUnit; @CapabilityDescription("Publishes Bulletin events using the Site To Site protocol. Note: only up to 5 bulletins are stored per component and up to " + "10 bulletins at controller level for a duration of up to 5 minutes. If this reporting task is not scheduled frequently enough some bulletins " + "may not be sent.") -@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last bulletin ID so that on restart the task knows where it left off.") @Restricted( restrictions = { @Restriction( @@ -98,19 +95,6 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting return; } - if (lastSentBulletinId < 0) { - Map<String, String> state; - try { - state = context.getStateManager().getState(Scope.LOCAL).toMap(); - } catch (IOException e) { - getLogger().error("Failed to get state at start up due to:" + e.getMessage(), e); - return; - } - if (state.containsKey(LAST_EVENT_ID_KEY)) { - lastSentBulletinId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)); - } - } - final BulletinQuery bulletinQuery = new BulletinQuery.Builder().after(lastSentBulletinId).build(); final List<Bulletin> bulletins = context.getBulletinRepository().findBulletins(bulletinQuery); @@ -181,14 +165,6 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting throw new ProcessException("Failed to send Bulletins to destination due to IOException:" + e.getMessage(), e); } - // Store the id of the last event so we know where we left off - try { - context.getStateManager().setState(Collections.singletonMap(LAST_EVENT_ID_KEY, String.valueOf(currMaxId)), Scope.LOCAL); - } catch (final IOException ioe) { - getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart.", - new Object[]{currMaxId, ioe.getMessage()}, ioe); - } - lastSentBulletinId = currMaxId; } http://git-wip-us.apache.org/repos/asf/nifi/blob/23937835/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java index 72b8ff3..6d70442 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java @@ -38,14 +38,11 @@ import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.state.Scope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.reporting.AbstractSiteToSiteReportingTask.NiFiUrlValidator; -import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockPropertyValue; import org.junit.Assert; import org.junit.Test; @@ -91,7 +88,6 @@ public class TestSiteToSiteBulletinReportingTask { // creating reporting task final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask(); - Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task)); // settings properties and mocking access to properties final Map<PropertyDescriptor, String> properties = new HashMap<>(); @@ -127,69 +123,6 @@ public class TestSiteToSiteBulletinReportingTask { assertEquals("group-name", bulletinJson.getString("bulletinGroupName")); } - @Test - public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException { - // creating the list of bulletins - final List<Bulletin> bulletins = new ArrayList<Bulletin>(); - bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); - bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); - bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); - bulletins.add(BulletinFactory.createBulletin("category", "severity", "message")); - - // mock the access to the list of bulletins - final ReportingContext context = Mockito.mock(ReportingContext.class); - final BulletinRepository repository = Mockito.mock(BulletinRepository.class); - Mockito.when(context.getBulletinRepository()).thenReturn(repository); - Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins); - - final long maxEventId = getMaxBulletinId(bulletins);; - - // create the mock reporting task and mock state manager - final MockSiteToSiteBulletinReportingTask task = new MockSiteToSiteBulletinReportingTask(); - final MockStateManager stateManager = new MockStateManager(task); - - // settings properties and mocking access to properties - final Map<PropertyDescriptor, String> properties = new HashMap<>(); - for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { - properties.put(descriptor, descriptor.getDefaultValue()); - } - properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000"); - properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi"); - properties.put(SiteToSiteBulletinReportingTask.TRANSPORT_PROTOCOL, SiteToSiteTransportProtocol.HTTP.name()); - properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_HOSTNAME, "localhost"); - properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PORT, "80"); - properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_USERNAME, "username"); - properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PASSWORD, "password"); - - Mockito.doAnswer(new Answer<PropertyValue>() { - @Override - public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { - final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); - return new MockPropertyValue(properties.get(descriptor)); - } - }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); - - // create the state map and set the last id to the same value as maxEventId - final Map<String,String> state = new HashMap<>(); - state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, String.valueOf(maxEventId)); - stateManager.setState(state, Scope.LOCAL); - - // setup the mock reporting context to return the mock state manager - Mockito.when(context.getStateManager()).thenReturn(stateManager); - - // setup the mock initialization context - final ComponentLog logger = Mockito.mock(ComponentLog.class); - final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); - Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); - Mockito.when(initContext.getLogger()).thenReturn(logger); - - task.initialize(initContext); - - // execute the reporting task and should not produce any data b/c max id same as previous id - task.onTrigger(context); - assertEquals(0, task.dataSent.size()); - } - private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask { final List<byte[]> dataSent = new ArrayList<>(); @@ -220,14 +153,4 @@ public class TestSiteToSiteBulletinReportingTask { } - private Long getMaxBulletinId(List<Bulletin> bulletins) { - long result = -1L; - for (Bulletin bulletin : bulletins) { - if(bulletin.getId() > result) { - result = bulletin.getId(); - } - } - return result; - } - }
