[
https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237890#comment-15237890
]
ASF GitHub Bot commented on MINIFI-13:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi-minifi/pull/6#discussion_r59445052
--- Diff:
minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.provenance.reporting;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestProvenanceReportingTask {
+
+ @Test
+ public void testSerializedForm() throws IOException,
InitializationException {
+ final String uuid = "10000000-0000-0000-0000-000000000000";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("abc", "xyz");
+ attributes.put("xyz", "abc");
+ attributes.put("filename", "file-" + uuid);
+
+ final Map<String, String> prevAttrs = new HashMap<>();
+ attributes.put("filename", "1234.xyz");
+
+ final Set<String> lineageIdentifiers = new HashSet<>();
+ lineageIdentifiers.add("123");
+ lineageIdentifiers.add("321");
+
+ final ProvenanceEventBuilder builder = new
StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", uuid);
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setAttributes(prevAttrs, attributes);
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+ builder.setLineageIdentifiers(lineageIdentifiers);
+ final ProvenanceEventRecord event = builder.build();
+
+ final List<byte[]> dataSent = new ArrayList<>();
+ final ProvenanceReportingTask task = new ProvenanceReportingTask()
{
+ @SuppressWarnings("unchecked")
+ @Override
+ protected SiteToSiteClient getClient() {
+ final SiteToSiteClient client =
Mockito.mock(SiteToSiteClient.class);
+ final Transaction transaction =
Mockito.mock(Transaction.class);
+
+ try {
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock
invocation) throws Throwable {
+ final byte[] data =
invocation.getArgumentAt(0, byte[].class);
+ dataSent.add(data);
+ return null;
+ }
+ }).when(transaction).send(Mockito.any(byte[].class),
Mockito.any(Map.class));
+
+
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.toString());
+ }
+
+ return client;
+ }
+ };
+
+ final List<ProvenanceEventRecord> events = new ArrayList<>();
+ events.add(event);
+
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor :
task.getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(ProvenanceReportingTask.BATCH_SIZE, "1000");
+
+ final ReportingContext context =
Mockito.mock(ReportingContext.class);
+ Mockito.when(context.getStateManager())
+ .thenReturn(new MockStateManager(task));
+ Mockito.doAnswer(new Answer<PropertyValue>() {
+ @Override
+ public PropertyValue answer(final InvocationOnMock invocation)
throws Throwable {
+ final PropertyDescriptor descriptor =
invocation.getArgumentAt(0, PropertyDescriptor.class);
+ return new MockPropertyValue(properties.get(descriptor),
null);
+ }
+
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+ final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+ Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
+ @Override
+ public List<ProvenanceEventRecord> answer(final
InvocationOnMock invocation) throws Throwable {
+ final long startId = invocation.getArgumentAt(0,
long.class);
+ final int maxRecords = invocation.getArgumentAt(1,
int.class);
+
+ final List<ProvenanceEventRecord> eventsToReturn = new
ArrayList<>();
+ for (int i = (int) Math.max(0, startId); i < (int)
(startId + maxRecords) && i < events.size(); i++) {
+ eventsToReturn.add(events.get(i));
+ }
+ return eventsToReturn;
+ }
+ }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(),
Mockito.anyInt());
+
+ Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+
+ final ComponentLog logger = Mockito.mock(ComponentLog.class);
+ final ReportingInitializationContext initContext =
Mockito.mock(ReportingInitializationContext.class);
+
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+ Mockito.when(initContext.getLogger()).thenReturn(logger);
+
+
+ task.initialize(initContext);
+ task.onTrigger(context);
+
+ assertEquals(1, dataSent.size());
+ final String msg = new String(dataSent.get(0),
StandardCharsets.UTF_8);
+ System.out.println(msg);
--- End diff --
I agree, I am going change it to verify the message is getting transferred
properly.
> Create a Reporting Task to Send Provenance data
> -----------------------------------------------
>
> Key: MINIFI-13
> URL: https://issues.apache.org/jira/browse/MINIFI-13
> Project: Apache NiFi MiNiFi
> Issue Type: Sub-task
> Components: Data Format, Data Transmission
> Reporter: Joseph Percivall
> Assignee: Joseph Percivall
> Fix For: 0.0.1
>
>
> With initial effort to re-use as much of NiFi as possible it is not possible
> to easily create a ProvenanceReporter to add provenance events as attributes
> to FlowFiles as it would require changing the ProvenancenReporter interface.
> This will require utilizing a different extension point to transmit the
> provenance data back to a core NiFi instance.
> Probably the most efficient way to do this is to create a ReportingTask which
> reports the provenance events using the S2S protocol.
> In the future this will probably be retired as a reporting task as MiNiFi
> grows to rely less on NiFi but this Reporting task could also be contributed
> back to NiFi.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)