Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1401#discussion_r98357001
  
    --- Diff: 
nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
 ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +
    +import org.apache.nifi.annotation.behavior.Restricted;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.configuration.DefaultSchedule;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.scheduling.SchedulingStrategy;
    +
    +@Tags({"bulletin", "site", "site to site", "restricted"})
    +@CapabilityDescription("Publishes Bulletin events using the Site To Site 
protocol. Note: only up to 5 bulletins are stored per component and up to "
    +        + "10 bulletins at controller level for a duration of up to 5 
minutes. If this reporting task is not scheduled frequently enough some 
bulletins "
    +        + "may not be sent.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last bulletin ID so that on restart the task knows where it left off.")
    +@Restricted("Provides operator the ability to send sensitive details 
contained in bulletin events to any external system.")
    +@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 
min")
    +public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReportingTask {
    +
    +    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new 
PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each 
provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long lastSentBulletinId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +
    +        final boolean isClustered = context.isClustered();
    +        final String nodeId = context.getClusterNodeIdentifier();
    +        if (nodeId == null && isClustered) {
    +            getLogger().debug("This instance of NiFi is configured for 
clustering, but the Cluster Node Identifier is not yet available. "
    +                + "Will wait for Node Identifier to be established.");
    +            return;
    +        }
    +
    +        if (lastSentBulletinId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = 
context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due 
to:" + e.getMessage(), e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    +                lastSentBulletinId = 
Long.parseLong(state.get(LAST_EVENT_ID_KEY));
    +            }
    +        }
    +
    +        final BulletinQuery bulletinQuery = new 
BulletinQuery.Builder().after(lastSentBulletinId).build();
    +        final List<Bulletin> bulletins = 
context.getBulletinRepository().findBulletins(bulletinQuery);
    +
    +        if(bulletins == null || bulletins.isEmpty()) {
    +            getLogger().debug("No events to send because no events are 
stored in the repository.");
    +            return;
    +        }
    +
    +        final Long currMaxId = getMaxBulletinId(bulletins);
    +
    +        if(currMaxId < lastSentBulletinId){
    +            getLogger().warn("Current bulletin max id is {} which is less 
than what was stored in state as the last queried event, which was {}. "
    +                    + "This means the bulletins repository restarted its 
ids. Restarting querying from the beginning.", new Object[]{currMaxId, 
lastSentBulletinId});
    +            lastSentBulletinId = -1;
    +        }
    +
    +        if (currMaxId == lastSentBulletinId) {
    +            getLogger().debug("No events to send due to the current max id 
being equal to the last id that was sent.");
    +            return;
    +        }
    +
    +        final String platform = 
context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
    +
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = 
Json.createBuilderFactory(config);
    +        final JsonObjectBuilder builder = factory.createObjectBuilder();
    +
    +        final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
    +        df.setTimeZone(TimeZone.getTimeZone("Z"));
    +
    +        final long start = System.nanoTime();
    +
    +        // Create a JSON array of all the events in the current batch
    +        final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
    +        for (final Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > lastSentBulletinId) {
    +                arrayBuilder.add(serialize(factory, builder, bulletin, df, 
platform, nodeId));
    +            }
    +        }
    +        final JsonArray jsonArray = arrayBuilder.build();
    +
    +        // Send the JSON document for the current batch
    +        try {
    +            final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
    +            if (transaction == null) {
    +                getLogger().debug("All destination nodes are penalized; 
will attempt to send data later");
    +                return;
    +            }
    +
    +            final Map<String, String> attributes = new HashMap<>();
    +            final String transactionId = UUID.randomUUID().toString();
    +            attributes.put("reporting.task.transaction.id", transactionId);
    +
    +            final byte[] data = 
jsonArray.toString().getBytes(StandardCharsets.UTF_8);
    +            transaction.send(data, attributes);
    +            transaction.confirm();
    +            transaction.complete();
    +
    +            final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    +            getLogger().info("Successfully sent {} Bulletins to 
destination in {} ms; Transaction ID = {}; First Event ID = {}",
    +                    new Object[]{bulletins.size(), transferMillis, 
transactionId, bulletins.get(0).getId()});
    +        } catch (final IOException e) {
    +            throw new ProcessException("Failed to send Bulletins to 
destination due to IOException:" + e.getMessage(), e);
    +        }
    +
    +        // Store the id of the last event so we know where we left off
    +        try {
    +            StateManager stateManager = context.getStateManager();
    +            Map<String, String> newMapOfState = new HashMap<>();
    +            newMapOfState.put(LAST_EVENT_ID_KEY, 
String.valueOf(currMaxId));
    +            stateManager.setState(newMapOfState, Scope.LOCAL);
    +        } catch (final IOException ioe) {
    +            getLogger().error("Failed to update state to {} due to {}; 
this could result in events being re-sent after a restart.",
    +                    new Object[]{currMaxId, ioe.getMessage()}, ioe);
    +        }
    +
    +        lastSentBulletinId = currMaxId;
    +    }
    +
    +    private Long getMaxBulletinId(List<Bulletin> bulletins) {
    +        long result = -1L;
    +        for (Bulletin bulletin : bulletins) {
    +            if(bulletin.getId() > result) {
    +                result = bulletin.getId();
    --- End diff --
    
    A bit confused. What if several _bulletin.getId()_ are greater then 
_result_ are you expecting the last one? And if so is there some predefined 
ordering?
    Alternatively I am thinking that you may want to break out of the loop once 
_result_ is assigned, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to