[
https://issues.apache.org/jira/browse/NIFI-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15280629#comment-15280629
]
ASF GitHub Bot commented on NIFI-1858:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/419#discussion_r62906556
--- Diff:
nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
---
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
+@CapabilityDescription("Publishes Provenance events using the Site To Site
protocol.")
+@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's
last event Id so that on restart the task knows where it left off.")
+public class SiteToSiteProvenanceReportingTask extends
AbstractSiteToSiteReportingTask {
+
+ private static final String TIMESTAMP_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ private static final String LAST_EVENT_ID_KEY = "last_event_id";
+
+ static final PropertyDescriptor PLATFORM = new
PropertyDescriptor.Builder()
+ .name("Platform")
+ .description("The value to use for the platform field in each
provenance event.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .defaultValue("nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private volatile long firstEventId = -1L;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(PLATFORM);
+ return properties;
+ }
+
+ private String getComponentName(final ProcessGroupStatus status, final
ProvenanceEventRecord event) {
+ if (status == null) {
+ return null;
+ }
+
+ final String componentId = event.getComponentId();
+ if (status.getId().equals(componentId)) {
+ return status.getName();
+ }
+
+ for (final ProcessorStatus procStatus :
status.getProcessorStatus()) {
+ if (procStatus.getId().equals(componentId)) {
+ return procStatus.getName();
+ }
+ }
+
+ for (final PortStatus portStatus : status.getInputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return portStatus.getName();
+ }
+ }
+
+ for (final PortStatus portStatus : status.getOutputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return portStatus.getName();
+ }
+ }
+
+ for (final RemoteProcessGroupStatus rpgStatus :
status.getRemoteProcessGroupStatus()) {
+ if (rpgStatus.getId().equals(componentId)) {
+ return rpgStatus.getName();
+ }
+ }
+
+ for (final ProcessGroupStatus childGroup :
status.getProcessGroupStatus()) {
+ final String componentName = getComponentName(childGroup,
event);
+ if (componentName != null) {
+ return componentName;
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void onTrigger(final ReportingContext context) {
+ final ProcessGroupStatus procGroupStatus =
context.getEventAccess().getControllerStatus();
+ final String rootGroupName = procGroupStatus == null ? null :
procGroupStatus.getName();
+
+ Long currMaxId =
context.getEventAccess().getProvenanceRepository().getMaxEventId();
+
+ if(currMaxId == null) {
+ getLogger().debug("No events to send because no events have
been created yet.");
+ return;
+ }
+
+ if (firstEventId < 0) {
+ Map<String, String> state;
+ try {
+ state =
context.getStateManager().getState(Scope.LOCAL).toMap();
+ } catch (IOException e) {
+ getLogger().error("Failed to get state at start up due to
{}:"+e.getMessage(), e);
+ return;
+ }
+ if (state.containsKey(LAST_EVENT_ID_KEY)) {
+ firstEventId =
Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
+ }
+
+ if(currMaxId < firstEventId){
+ getLogger().debug("Current provenance max id is {} which
is less than what was stored in state as the last queried event, which was {}.
This means the provenance restarted its " +
+ "ids. Restarting querying from the beginning.",
new Object[]{currMaxId, firstEventId});
+ firstEventId = -1;
+ }
+ }
+
+ if (currMaxId == (firstEventId - 1)) {
+ getLogger().debug("No events to send due to the current max id
being equal to the last id that was queried.");
+ return;
+ }
+
+ List<ProvenanceEventRecord> events;
+ try {
+ events =
context.getEventAccess().getProvenanceEvents(firstEventId,
context.getProperty(BATCH_SIZE).asInteger());
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve Provenance Events from
repository due to: " + ioe.getMessage(), ioe);
+ return;
+ }
+
+ if (events == null || events.isEmpty()) {
+ getLogger().debug("No events to send due to 'events' being
null or empty.");
+ return;
+ }
+
+ final String nifiUrl =
context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
+ URL url;
+ try {
+ url = new URL(nifiUrl);
+ } catch (final MalformedURLException e1) {
+ // already validated
+ throw new AssertionError();
+ }
+
+ final String hostname = url.getHost();
+ final String platform =
context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
+
+ final Map<String, ?> config = Collections.emptyMap();
+ final JsonBuilderFactory factory =
Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+
+ while (events != null && !events.isEmpty()) {
+ final long start = System.nanoTime();
+
+ // Create a JSON array of all the events in the current batch
+ final JsonArrayBuilder arrayBuilder =
factory.createArrayBuilder();
+ for (final ProvenanceEventRecord event : events) {
+ arrayBuilder.add(serialize(factory, builder, event,
getComponentName(procGroupStatus, event), hostname, url, rootGroupName,
platform));
+ }
+ final JsonArray jsonArray = arrayBuilder.build();
+
+ // Send the JSON document for the current batch
+ try {
+ final Transaction transaction =
getClient().createTransaction(TransferDirection.SEND);
+ if (transaction == null) {
+ getLogger().debug("All destination nodes are
penalized; will attempt to send data later");
+ return;
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ final String transactionId = UUID.randomUUID().toString();
+ attributes.put("reporting.task.transaction.id",
transactionId);
+
+ final byte[] data =
jsonArray.toString().getBytes(StandardCharsets.UTF_8);
+ transaction.send(data, attributes);
+ transaction.confirm();
+ transaction.complete();
+
+ final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ getLogger().info("Successfully sent {} Provenance Events
to destination in {} ms; Transaction ID = {}; First Event ID = {}",
--- End diff --
I'm ok with leaving this at INFO - from the point of view of the reporting
task, it is an INFO-level event.
> Add ReportingTask for sending provenance events over Site-To-Site
> -----------------------------------------------------------------
>
> Key: NIFI-1858
> URL: https://issues.apache.org/jira/browse/NIFI-1858
> Project: Apache NiFi
> Issue Type: Improvement
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 0.7.0
>
>
> Currently if someone wants to export Provenance events they can do so through
> a ReportingTask. Rather than creating specialized ReportingTasks for
> different destinations, another approach would be to send provenance events
> to another NiFi instance over site-to-site and then use the standard
> processors in that other instance to store the events in the desired
> destination.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)