Repository: nifi
Updated Branches:
  refs/heads/0.x 89d7d7721 -> c7c4d5f71


NIFI-2399 This closes #722. Correcting comparison of maxEventId against 
lastEventId in SiteToSiteProvenanceReportingTask


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c7c4d5f7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c7c4d5f7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c7c4d5f7

Branch: refs/heads/0.x
Commit: c7c4d5f714e4329486291f462927680ac66083d9
Parents: 89d7d77
Author: Bryan Bende <[email protected]>
Authored: Mon Jul 25 21:19:40 2016 -0400
Committer: joewitt <[email protected]>
Committed: Mon Jul 25 23:48:08 2016 -0400

----------------------------------------------------------------------
 .../SiteToSiteProvenanceReportingTask.java      |   6 +-
 .../TestSiteToSiteProvenanceReportingTask.java  | 136 +++++++++++++------
 2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c7c4d5f7/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 4a897f7..5af8c9f 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -62,8 +62,8 @@ import java.util.concurrent.TimeUnit;
 @Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last event Id so that on restart the task knows where it left off.")
 public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReportingTask {
 
-    private static final String TIMESTAMP_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-    private static final String LAST_EVENT_ID_KEY = "last_event_id";
+    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+    static final String LAST_EVENT_ID_KEY = "last_event_id";
 
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
@@ -138,7 +138,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
                 firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 
1;
             }
 
-            if(currMaxId < firstEventId){
+            if(currMaxId < (firstEventId - 1)){
                 getLogger().warn("Current provenance max id is {} which is 
less than what was stored in state as the last queried event, which was {}. 
This means the provenance restarted its " +
                         "ids. Restarting querying from the beginning.", new 
Object[]{currMaxId, firstEventId});
                 firstEventId = -1;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c7c4d5f7/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index 12a5e4c..f5379b5 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -17,21 +17,9 @@
 
 package org.apache.nifi.reporting;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.provenance.ProvenanceEventBuilder;
@@ -56,6 +44,18 @@ import org.mockito.stubbing.Answer;
 import javax.json.Json;
 import javax.json.JsonObject;
 import javax.json.JsonReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
 
 public class TestSiteToSiteProvenanceReportingTask {
 
@@ -86,33 +86,7 @@ public class TestSiteToSiteProvenanceReportingTask {
         builder.setLineageIdentifiers(lineageIdentifiers);
         final ProvenanceEventRecord event = builder.build();
 
-        final List<byte[]> dataSent = new ArrayList<>();
-        final SiteToSiteProvenanceReportingTask task = new 
SiteToSiteProvenanceReportingTask() {
-            @SuppressWarnings("unchecked")
-            @Override
-            protected SiteToSiteClient getClient() {
-                final SiteToSiteClient client = 
Mockito.mock(SiteToSiteClient.class);
-                final Transaction transaction = 
Mockito.mock(Transaction.class);
-
-                try {
-                    Mockito.doAnswer(new Answer<Object>() {
-                        @Override
-                        public Object answer(final InvocationOnMock 
invocation) throws Throwable {
-                            final byte[] data = invocation.getArgumentAt(0, 
byte[].class);
-                            dataSent.add(data);
-                            return null;
-                        }
-                    }).when(transaction).send(Mockito.any(byte[].class), 
Mockito.any(Map.class));
-
-                    
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
-                } catch (final Exception e) {
-                    e.printStackTrace();
-                    Assert.fail(e.toString());
-                }
-
-                return client;
-            }
-        };
+        final MockSiteToSiteProvenanceReportingTask task = new 
MockSiteToSiteProvenanceReportingTask();
 
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         for (final PropertyDescriptor descriptor : 
task.getSupportedPropertyDescriptors()) {
@@ -170,18 +144,94 @@ public class TestSiteToSiteProvenanceReportingTask {
         task.initialize(initContext);
         task.onTrigger(context);
 
-        assertEquals(3, dataSent.size());
-        final String msg = new String(dataSent.get(0), StandardCharsets.UTF_8);
+        assertEquals(3, task.dataSent.size());
+        final String msg = new String(task.dataSent.get(0), 
StandardCharsets.UTF_8);
         JsonReader jsonReader = Json.createReader(new 
ByteArrayInputStream(msg.getBytes()));
         JsonObject msgArray = 
jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes");
         assertEquals(msgArray.getString("abc"), 
event.getAttributes().get("abc"));
     }
 
+    @Test
+    public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() 
throws IOException, InitializationException {
+        final long maxEventId = 2500;
+
+        // create the mock reporting task and mock state manager
+        final MockSiteToSiteProvenanceReportingTask task = new 
MockSiteToSiteProvenanceReportingTask();
+        final MockStateManager stateManager = new MockStateManager(task);
+
+        // create the state map and set the last id to the same value as 
maxEventId
+        final Map<String,String> state = new HashMap<>();
+        state.put(SiteToSiteProvenanceReportingTask.LAST_EVENT_ID_KEY, 
String.valueOf(maxEventId));
+        stateManager.setState(state, Scope.LOCAL);
+
+        // setup the mock reporting context to return the mock state manager
+        final ReportingContext context = Mockito.mock(ReportingContext.class);
+        Mockito.when(context.getStateManager()).thenReturn(stateManager);
+
+        // setup the mock provenance repository to return maxEventId
+        final ProvenanceEventRepository provenanceRepository = 
Mockito.mock(ProvenanceEventRepository.class);
+        Mockito.doAnswer(new Answer<Long>() {
+            @Override
+            public Long answer(final InvocationOnMock invocation) throws 
Throwable {
+                return maxEventId;
+            }
+        }).when(provenanceRepository).getMaxEventId();
+
+        // setup the mock EventAccess to return the mock provenance repository
+        final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+        
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+
+        // setup the mock initialization context
+        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
+        
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+        task.initialize(initContext);
+
+        // execute the reporting task and should not produce any data b/c max 
id same as previous id
+        task.onTrigger(context);
+        assertEquals(0, task.dataSent.size());
+    }
+
     public static FlowFile createFlowFile(final long id, final Map<String, 
String> attributes) {
         MockFlowFile mockFlowFile = new MockFlowFile(id);
         mockFlowFile.putAttributes(attributes);
         return mockFlowFile;
     }
 
+    private static final class MockSiteToSiteProvenanceReportingTask extends 
SiteToSiteProvenanceReportingTask {
+
+        final List<byte[]> dataSent = new ArrayList<>();
+
+        @Override
+        protected SiteToSiteClient getClient() {
+            final SiteToSiteClient client = 
Mockito.mock(SiteToSiteClient.class);
+            final Transaction transaction = Mockito.mock(Transaction.class);
+
+            try {
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) 
throws Throwable {
+                        final byte[] data = invocation.getArgumentAt(0, 
byte[].class);
+                        dataSent.add(data);
+                        return null;
+                    }
+                }).when(transaction).send(Mockito.any(byte[].class), 
Mockito.any(Map.class));
+
+                
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                Assert.fail(e.toString());
+            }
+
+            return client;
+        }
+
+        public List<byte[]> getDataSent() {
+            return dataSent;
+        }
+    }
 
-}
+}
\ No newline at end of file

Reply via email to