Repository: nifi
Updated Branches:
  refs/heads/master af2513adf -> 23937835f


NIFI-5092 - Removed local state management for S2S Bulletins RT

Signed-off-by: Matthew Burgess <[email protected]>

This closes #2643


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/23937835
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/23937835
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/23937835

Branch: refs/heads/master
Commit: 23937835f30ffa4082360f71ce0526d794f514bd
Parents: af2513a
Author: Pierre Villard <[email protected]>
Authored: Wed Apr 18 12:21:04 2018 +0200
Committer: Matthew Burgess <[email protected]>
Committed: Wed Apr 25 16:25:00 2018 -0400

----------------------------------------------------------------------
 .../SiteToSiteBulletinReportingTask.java        | 24 ------
 .../TestSiteToSiteBulletinReportingTask.java    | 77 --------------------
 2 files changed, 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/23937835/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index 20ed96a..d026aa1 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -19,13 +19,11 @@ package org.apache.nifi.reporting;
 
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.Restriction;
-import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -57,7 +55,6 @@ import java.util.concurrent.TimeUnit;
 @CapabilityDescription("Publishes Bulletin events using the Site To Site 
protocol. Note: only up to 5 bulletins are stored per component and up to "
         + "10 bulletins at controller level for a duration of up to 5 minutes. 
If this reporting task is not scheduled frequently enough some bulletins "
         + "may not be sent.")
-@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last bulletin ID so that on restart the task knows where it left off.")
 @Restricted(
         restrictions = {
                 @Restriction(
@@ -98,19 +95,6 @@ public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReporting
             return;
         }
 
-        if (lastSentBulletinId < 0) {
-            Map<String, String> state;
-            try {
-                state = 
context.getStateManager().getState(Scope.LOCAL).toMap();
-            } catch (IOException e) {
-                getLogger().error("Failed to get state at start up due to:" + 
e.getMessage(), e);
-                return;
-            }
-            if (state.containsKey(LAST_EVENT_ID_KEY)) {
-                lastSentBulletinId = 
Long.parseLong(state.get(LAST_EVENT_ID_KEY));
-            }
-        }
-
         final BulletinQuery bulletinQuery = new 
BulletinQuery.Builder().after(lastSentBulletinId).build();
         final List<Bulletin> bulletins = 
context.getBulletinRepository().findBulletins(bulletinQuery);
 
@@ -181,14 +165,6 @@ public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReporting
             throw new ProcessException("Failed to send Bulletins to 
destination due to IOException:" + e.getMessage(), e);
         }
 
-        // Store the id of the last event so we know where we left off
-        try {
-            
context.getStateManager().setState(Collections.singletonMap(LAST_EVENT_ID_KEY, 
String.valueOf(currMaxId)), Scope.LOCAL);
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to update state to {} due to {}; this 
could result in events being re-sent after a restart.",
-                    new Object[]{currMaxId, ioe.getMessage()}, ioe);
-        }
-
         lastSentBulletinId = currMaxId;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/23937835/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
index 72b8ff3..6d70442 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java
@@ -38,14 +38,11 @@ import 
org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import 
org.apache.nifi.reporting.AbstractSiteToSiteReportingTask.NiFiUrlValidator;
-import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockPropertyValue;
 import org.junit.Assert;
 import org.junit.Test;
@@ -91,7 +88,6 @@ public class TestSiteToSiteBulletinReportingTask {
 
         // creating reporting task
         final MockSiteToSiteBulletinReportingTask task = new 
MockSiteToSiteBulletinReportingTask();
-        Mockito.when(context.getStateManager()).thenReturn(new 
MockStateManager(task));
 
         // settings properties and mocking access to properties
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
@@ -127,69 +123,6 @@ public class TestSiteToSiteBulletinReportingTask {
         assertEquals("group-name", 
bulletinJson.getString("bulletinGroupName"));
     }
 
-    @Test
-    public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() 
throws IOException, InitializationException {
-        // creating the list of bulletins
-        final List<Bulletin> bulletins = new ArrayList<Bulletin>();
-        bulletins.add(BulletinFactory.createBulletin("category", "severity", 
"message"));
-        bulletins.add(BulletinFactory.createBulletin("category", "severity", 
"message"));
-        bulletins.add(BulletinFactory.createBulletin("category", "severity", 
"message"));
-        bulletins.add(BulletinFactory.createBulletin("category", "severity", 
"message"));
-
-        // mock the access to the list of bulletins
-        final ReportingContext context = Mockito.mock(ReportingContext.class);
-        final BulletinRepository repository = 
Mockito.mock(BulletinRepository.class);
-        Mockito.when(context.getBulletinRepository()).thenReturn(repository);
-        
Mockito.when(repository.findBulletins(Mockito.any(BulletinQuery.class))).thenReturn(bulletins);
-
-        final long maxEventId = getMaxBulletinId(bulletins);;
-
-        // create the mock reporting task and mock state manager
-        final MockSiteToSiteBulletinReportingTask task = new 
MockSiteToSiteBulletinReportingTask();
-        final MockStateManager stateManager = new MockStateManager(task);
-
-        // settings properties and mocking access to properties
-        final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        for (final PropertyDescriptor descriptor : 
task.getSupportedPropertyDescriptors()) {
-            properties.put(descriptor, descriptor.getDefaultValue());
-        }
-        properties.put(SiteToSiteBulletinReportingTask.BATCH_SIZE, "1000");
-        properties.put(SiteToSiteBulletinReportingTask.PLATFORM, "nifi");
-        properties.put(SiteToSiteBulletinReportingTask.TRANSPORT_PROTOCOL, 
SiteToSiteTransportProtocol.HTTP.name());
-        properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_HOSTNAME, 
"localhost");
-        properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PORT, "80");
-        properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_USERNAME, 
"username");
-        properties.put(SiteToSiteBulletinReportingTask.HTTP_PROXY_PASSWORD, 
"password");
-
-        Mockito.doAnswer(new Answer<PropertyValue>() {
-            @Override
-            public PropertyValue answer(final InvocationOnMock invocation) 
throws Throwable {
-                final PropertyDescriptor descriptor = 
invocation.getArgumentAt(0, PropertyDescriptor.class);
-                return new MockPropertyValue(properties.get(descriptor));
-            }
-        }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
-
-        // create the state map and set the last id to the same value as 
maxEventId
-        final Map<String,String> state = new HashMap<>();
-        state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, 
String.valueOf(maxEventId));
-        stateManager.setState(state, Scope.LOCAL);
-
-        // setup the mock reporting context to return the mock state manager
-        Mockito.when(context.getStateManager()).thenReturn(stateManager);
-
-        // setup the mock initialization context
-        final ComponentLog logger = Mockito.mock(ComponentLog.class);
-        final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
-        
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
-        Mockito.when(initContext.getLogger()).thenReturn(logger);
-
-        task.initialize(initContext);
-
-        // execute the reporting task and should not produce any data b/c max 
id same as previous id
-        task.onTrigger(context);
-        assertEquals(0, task.dataSent.size());
-    }
-
     private static final class MockSiteToSiteBulletinReportingTask extends 
SiteToSiteBulletinReportingTask {
 
         final List<byte[]> dataSent = new ArrayList<>();
@@ -220,14 +153,4 @@ public class TestSiteToSiteBulletinReportingTask {
 
     }
 
-    private Long getMaxBulletinId(List<Bulletin> bulletins) {
-        long result = -1L;
-        for (Bulletin bulletin : bulletins) {
-            if(bulletin.getId() > result) {
-                result = bulletin.getId();
-            }
-        }
-        return result;
-    }
-
 }

Reply via email to