Repository: nifi
Updated Branches:
  refs/heads/master eaefec6d8 -> b6eb0ac0f


NIFI-3859 - Provide filtering options in S2SProvenanceReportingTask

This closes #1777.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b6eb0ac0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b6eb0ac0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b6eb0ac0

Branch: refs/heads/master
Commit: b6eb0ac0fb382b5cc2e3f60196e36835437883f3
Parents: eaefec6
Author: Pierre Villard <[email protected]>
Authored: Tue May 9 20:44:43 2017 +0200
Committer: Koji Kawamura <[email protected]>
Committed: Thu May 25 14:10:50 2017 +0900

----------------------------------------------------------------------
 .../SiteToSiteProvenanceReportingTask.java      |  95 +++++++-
 .../TestSiteToSiteProvenanceReportingTask.java  | 237 ++++++++++++++++---
 2 files changed, 300 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b6eb0ac0/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 9d6d009..cae4d17 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -17,13 +17,16 @@
 
 package org.apache.nifi.reporting;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
@@ -31,6 +34,7 @@ import 
org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 
@@ -47,6 +51,7 @@ import java.nio.charset.StandardCharsets;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,6 +60,7 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 @Tags({"provenance", "lineage", "tracking", "site", "site to site", 
"restricted"})
 @CapabilityDescription("Publishes Provenance events using the Site To Site 
protocol.")
@@ -67,6 +73,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
+        .displayName("Platform")
         .description("The value to use for the platform field in each 
provenance event.")
         .required(true)
         .expressionLanguageSupported(true)
@@ -74,12 +81,73 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    static final PropertyDescriptor FILTER_EVENT_TYPE = new 
PropertyDescriptor.Builder()
+        .name("s2s-prov-task-event-filter")
+        .displayName("Event type")
+        .description("Comma-separated list of event types that will be used to 
filter the provenance events sent by the reporting task. "
+                + "Available event types are " + ProvenanceEventType.values() 
+ ". If no filter is set, all the events are sent. If "
+                        + "multiple filters are set, the filters are 
cumulative.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor FILTER_COMPONENT_TYPE = new 
PropertyDescriptor.Builder()
+        .name("s2s-prov-task-type-filter")
+        .displayName("Component type")
+        .description("Regular expression to filter the provenance events based 
on the component type. Only the events matching the regular "
+                + "expression will be sent. If no filter is set, all the 
events are sent. If multiple filters are set, the filters are cumulative.")
+        .required(false)
+        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor FILTER_COMPONENT_ID = new 
PropertyDescriptor.Builder()
+        .name("s2s-prov-task-id-filter")
+        .displayName("Component ID")
+        .description("Comma-separated list of component UUID that will be used 
to filter the provenance events sent by the reporting task. If no "
+                + "filter is set, all the events are sent. If multiple filters 
are set, the filters are cumulative.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .build();
+
     private volatile long firstEventId = -1L;
+    private volatile boolean isFilteringEnabled = false;
+    private volatile Pattern componentTypeRegex;
+    private volatile List<ProvenanceEventType> eventTypes = new 
ArrayList<ProvenanceEventType>();
+    private volatile List<String> componentIds = new ArrayList<String>();
+
+    @OnScheduled
+    public void onScheduled(final ConfigurationContext context) throws 
IOException {
+        // initialize component type filtering
+        componentTypeRegex = 
StringUtils.isBlank(context.getProperty(FILTER_COMPONENT_TYPE).getValue()) ? 
null : Pattern.compile(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
+
+        final String[] eventList = 
StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(),
 ','));
+        if(eventList != null) {
+            for(String type : eventList) {
+                try {
+                    eventTypes.add(ProvenanceEventType.valueOf(type));
+                } catch (Exception e) {
+                    getLogger().warn(type + " is not a correct event type, 
removed from the filtering.");
+                }
+            }
+        }
+
+        // initialize component ID filtering
+        final String[] componentIdList = 
StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(),
 ','));
+        if(componentIdList != null) {
+            componentIds.addAll(Arrays.asList(componentIdList));
+        }
+
+        // set a boolean whether filtering will be applied or not
+        isFilteringEnabled = componentTypeRegex != null || 
!eventTypes.isEmpty() || !componentIds.isEmpty();
+    }
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(PLATFORM);
+        properties.add(FILTER_EVENT_TYPE);
+        properties.add(FILTER_COMPONENT_TYPE);
+        properties.add(FILTER_COMPONENT_ID);
         return properties;
     }
 
@@ -160,7 +228,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
         List<ProvenanceEventRecord> events;
         try {
-            events = 
context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger());
+            events = 
filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger()));
         } catch (final IOException ioe) {
             getLogger().error("Failed to retrieve Provenance Events from 
repository due to: " + ioe.getMessage(), ioe);
             return;
@@ -243,7 +311,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
             // Retrieve the next batch
             try {
-                events = 
context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger());
+                events = 
filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger()));
             } catch (final IOException ioe) {
                 getLogger().error("Failed to retrieve Provenance Events from 
repository due to: " + ioe.getMessage(), ioe);
                 return;
@@ -252,6 +320,29 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
 
     }
 
+    private List<ProvenanceEventRecord> 
filterEvents(List<ProvenanceEventRecord> provenanceEvents) {
+        if(isFilteringEnabled) {
+            List<ProvenanceEventRecord> filteredEvents = new 
ArrayList<ProvenanceEventRecord>();
+
+            for (ProvenanceEventRecord provenanceEventRecord : 
provenanceEvents) {
+                if(!componentIds.isEmpty() && 
!componentIds.contains(provenanceEventRecord.getComponentId())) {
+                    continue;
+                }
+                if(!eventTypes.isEmpty() && 
!eventTypes.contains(provenanceEventRecord.getEventType())) {
+                    continue;
+                }
+                if(componentTypeRegex != null && 
!componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches())
 {
+                    continue;
+                }
+                filteredEvents.add(provenanceEventRecord);
+            }
+
+            return filteredEvents;
+        } else {
+            return provenanceEvents;
+        }
+    }
+
     static JsonObject serialize(final JsonBuilderFactory factory, final 
JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat 
df,
         final String componentName, final String hostname, final URL nifiUrl, 
final String applicationName, final String platform, final String 
nodeIdentifier) {
         addField(builder, "eventId", UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/nifi/blob/b6eb0ac0/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index 9dce30b..a396ac8 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -21,6 +21,7 @@ package org.apache.nifi.reporting;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.provenance.ProvenanceEventBuilder;
@@ -32,7 +33,6 @@ import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.state.MockStateManager;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockPropertyValue;
 import org.junit.Assert;
@@ -44,6 +44,7 @@ import org.mockito.stubbing.Answer;
 import javax.json.Json;
 import javax.json.JsonObject;
 import javax.json.JsonReader;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -57,37 +58,13 @@ import static org.junit.Assert.assertEquals;
 
 public class TestSiteToSiteProvenanceReportingTask {
 
-    @Test
-    public void testSerializedForm() throws IOException, 
InitializationException {
-        final String uuid = "10000000-0000-0000-0000-000000000000";
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put("abc", "xyz");
-        attributes.put("xyz", "abc");
-        attributes.put("filename", "file-" + uuid);
-
-        final Map<String, String> prevAttrs = new HashMap<>();
-        attributes.put("filename", "1234.xyz");
-
-        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
-        builder.setEventTime(System.currentTimeMillis());
-        builder.setEventType(ProvenanceEventType.RECEIVE);
-        builder.setTransitUri("nifi://unit-test");
-        attributes.put("uuid", uuid);
-        builder.fromFlowFile(createFlowFile(3L, attributes));
-        builder.setAttributes(prevAttrs, attributes);
-        builder.setComponentId("1234");
-        builder.setComponentType("dummy processor");
-        final ProvenanceEventRecord event = builder.build();
+    private final ReportingContext context = 
Mockito.mock(ReportingContext.class);
+    private final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
+    private final ConfigurationContext confContext = 
Mockito.mock(ConfigurationContext.class);
 
+    private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord 
event, Map<PropertyDescriptor, String> properties) throws IOException {
         final MockSiteToSiteProvenanceReportingTask task = new 
MockSiteToSiteProvenanceReportingTask();
 
-        final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        for (final PropertyDescriptor descriptor : 
task.getSupportedPropertyDescriptors()) {
-            properties.put(descriptor, descriptor.getDefaultValue());
-        }
-        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
-
-        final ReportingContext context = Mockito.mock(ReportingContext.class);
         Mockito.when(context.getStateManager())
                 .thenReturn(new MockStateManager(task));
         Mockito.doAnswer(new Answer<PropertyValue>() {
@@ -98,6 +75,14 @@ public class TestSiteToSiteProvenanceReportingTask {
             }
         }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
 
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) 
throws Throwable {
+                final PropertyDescriptor descriptor = 
invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        
}).when(confContext).getProperty(Mockito.any(PropertyDescriptor.class));
+
         final long maxEventId = 2500;
         final AtomicInteger totalEvents = new AtomicInteger(0);
 
@@ -129,12 +114,25 @@ public class TestSiteToSiteProvenanceReportingTask {
         
Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
 
         final ComponentLog logger = Mockito.mock(ComponentLog.class);
-        final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
         
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
         Mockito.when(initContext.getLogger()).thenReturn(logger);
 
+        return task;
+    }
+
+    @Test
+    public void testSerializedForm() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
 
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
         task.initialize(initContext);
+        task.onScheduled(confContext);
         task.onTrigger(context);
 
         assertEquals(3, task.dataSent.size());
@@ -145,6 +143,163 @@ public class TestSiteToSiteProvenanceReportingTask {
     }
 
     @Test
+    public void testFilterComponentIdSuccess() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, 
"2345, 5678,  1234");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(3, task.dataSent.size());
+    }
+
+
+    @Test
+    public void testFilterComponentIdNoResult() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, 
"9999");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(0, task.dataSent.size());
+    }
+
+    @Test
+    public void testFilterComponentTypeSuccess() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, 
"dummy.*");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(3, task.dataSent.size());
+    }
+
+    @Test
+    public void testFilterComponentTypeNoResult() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, 
"proc.*");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(0, task.dataSent.size());
+    }
+
+    @Test
+    public void testFilterEventTypeSuccess() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, 
"RECEIVE, notExistingType, DROP");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(3, task.dataSent.size());
+    }
+
+    @Test
+    public void testFilterEventTypeNoResult() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, 
"DROP");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(0, task.dataSent.size());
+    }
+
+    @Test
+    public void testFilterMultiFilterNoResult() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, 
"2345, 5678,  1234");
+        
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, 
"dummy.*");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, 
"DROP");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(0, task.dataSent.size());
+    }
+
+    @Test
+    public void testFilterMultiFilterSuccess() throws IOException, 
InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new 
MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, 
"2345, 5678,  1234");
+        
properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE, 
"dummy.*");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, 
"RECEIVE");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(3, task.dataSent.size());
+    }
+
+    @Test
     public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() 
throws IOException, InitializationException {
         final long maxEventId = 2500;
 
@@ -194,6 +349,28 @@ public class TestSiteToSiteProvenanceReportingTask {
         return mockFlowFile;
     }
 
+    private ProvenanceEventRecord createProvenanceEventRecord() {
+        final String uuid = "10000000-0000-0000-0000-000000000000";
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("abc", "xyz");
+        attributes.put("xyz", "abc");
+        attributes.put("filename", "file-" + uuid);
+
+        final Map<String, String> prevAttrs = new HashMap<>();
+        attributes.put("filename", "1234.xyz");
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        attributes.put("uuid", uuid);
+        builder.fromFlowFile(createFlowFile(3L, attributes));
+        builder.setAttributes(prevAttrs, attributes);
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        return builder.build();
+    }
+
     private static final class MockSiteToSiteProvenanceReportingTask extends 
SiteToSiteProvenanceReportingTask {
 
         final List<byte[]> dataSent = new ArrayList<>();

Reply via email to