[ 
https://issues.apache.org/jira/browse/NIFI-190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15754493#comment-15754493
 ] 

ASF GitHub Bot commented on NIFI-190:
-------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1329#discussion_r92815826
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
 ---
    @@ -0,0 +1,266 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.distributed.cache.client.Deserializer;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +import org.apache.nifi.expression.AttributeExpression.ResultType;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import 
org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
    +
    +@EventDriven
    +@SupportsBatching
    +@Tags({"map", "cache", "wait", "hold", "distributed", "signal", "release"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Routes incoming FlowFiles to the 'wait' 
relationship until a matching release signal "
    +        + "is stored in the distributed cache from a corresponding Notify 
processor.  At this point, a waiting FlowFile is routed to "
    +        + "the 'success' relationship, with attributes copied from the 
FlowFile that produced "
    +        + "the release signal from the Notify processor.  The release 
signal entry is then removed from "
    +        + "the cache.  Waiting FlowFiles will be routed to 'expired' if 
they exceed the Expiration Duration.")
    +@WritesAttribute(attribute = "wait.start.timestamp", description = "All 
FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
    +        + "initial epoch timestamp when the file first entered this 
processor.  This is used to determine the expiration time of the FlowFile.")
    +@SeeAlso(classNames = 
{"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", 
"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
    +        "org.apache.nifi.processors.standard.Notify"})
    +public class Wait extends AbstractProcessor {
    +
    +    public static final String WAIT_START_TIMESTAMP = 
"wait.start.timestamp";
    +
    +    // Identifies the distributed map cache client
    +    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
    +        .name("Distributed Cache Service")
    +        .description("The Controller Service that is used to check for 
release signals from a corresponding Notify processor")
    +        .required(true)
    +        .identifiesControllerService(DistributedMapCacheClient.class)
    +        .build();
    +
    +    // Selects the FlowFile attribute or expression, whose value is used 
as cache key
    +    public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new 
PropertyDescriptor.Builder()
    +        .name("Release Signal Identifier")
    +        .description("A value, or the results of an Attribute Expression 
Language statement, which will " +
    +            "be evaluated against a FlowFile in order to determine the 
release signal cache key")
    +        .required(true)
    +        
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
 true))
    +        .expressionLanguageSupported(true)
    +        .build();
    +
    +    // Selects the FlowFile attribute or expression, whose value is used 
as cache key
    +    public static final PropertyDescriptor EXPIRATION_DURATION = new 
PropertyDescriptor.Builder()
    +        .name("Expiration Duration")
    +        .description("Indicates the duration after which waiting flow 
files will be routed to the 'expired' relationship")
    +        .required(true)
    +        .defaultValue("10 min")
    +        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +        .expressionLanguageSupported(false)
    +        .build();
    +
    +    public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new 
AllowableValue("replace", "Replace if present",
    +            "When cached attributes are copied onto released FlowFiles, 
they replace any matching attributes.");
    +
    +    public static final AllowableValue ATTRIBUTE_COPY_KEEP_ORIGINAL = new 
AllowableValue("keeporiginal", "Keep original",
    +            "Attributes on released FlowFiles are not overwritten by 
copied cached attributes.");
    +
    +    public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new 
PropertyDescriptor.Builder()
    +        .name("Attribute Copy Mode")
    +        .description("Specifies how to handle attributes copied from flow 
files entering the Notify processor")
    +        .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
    +        .required(true)
    +        .allowableValues(ATTRIBUTE_COPY_REPLACE, 
ATTRIBUTE_COPY_KEEP_ORIGINAL)
    +        .expressionLanguageSupported(false)
    +        .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +        .name("success")
    +        .description("A FlowFile with a matching release signal in the 
cache will be routed to this relationship")
    +        .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +        .name("failure")
    +        .description("When the cache cannot be reached, or if the Release 
Signal Identifier evaluates to null or empty, FlowFiles will be routed to this 
relationship")
    +        .build();
    +
    +    public static final Relationship REL_WAIT = new Relationship.Builder()
    +        .name("wait")
    +        .description("A FlowFile with no matching release signal in the 
cache will be routed to this relationship")
    +        .build();
    +
    +    public static final Relationship REL_EXPIRED = new 
Relationship.Builder()
    +        .name("expired")
    +        .description("A FlowFile that has exceeded the configured 
Expiration Duration will be routed to this relationship")
    +        .build();
    +    private final Set<Relationship> relationships;
    +
    +    private final Serializer<String> keySerializer = new 
StringSerializer();
    +    private final Deserializer<Map<String, String>> valueDeserializer = 
new FlowFileAttributesSerializer();
    +
    +    public Wait() {
    +        final Set<Relationship> rels = new HashSet<>();
    +        rels.add(REL_SUCCESS);
    +        rels.add(REL_WAIT);
    +        rels.add(REL_EXPIRED);
    +        rels.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(rels);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
    +        descriptors.add(EXPIRATION_DURATION);
    +        descriptors.add(DISTRIBUTED_CACHE_SERVICE);
    +        descriptors.add(ATTRIBUTE_COPY_MODE);
    +        return descriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final ComponentLog logger = getLogger();
    +
    +        // cache key is computed from attribute 
'RELEASE_SIGNAL_IDENTIFIER' with expression language support
    +        final String cacheKey = 
context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
    +
    +        // if the computed value is null, or empty, we transfer the flow 
file to failure relationship
    +        if (StringUtils.isBlank(cacheKey)) {
    +            logger.error("FlowFile {} has no attribute for given Release 
Signal Identifier", new Object[] {flowFile});
    +            flowFile = session.penalize(flowFile);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        // the cache client used to interact with the distributed cache
    +        final DistributedMapCacheClient cache = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
    +
    +        try {
    +            // check for expiration
    +            String waitStartTimestamp = 
flowFile.getAttribute(WAIT_START_TIMESTAMP);
    +            if (waitStartTimestamp == null) {
    +                waitStartTimestamp = 
String.valueOf(System.currentTimeMillis());
    +                flowFile = session.putAttribute(flowFile, 
WAIT_START_TIMESTAMP, waitStartTimestamp);
    +            }
    +
    +            long lWaitStartTimestamp = 0L;
    +            try {
    +                lWaitStartTimestamp = Long.parseLong(waitStartTimestamp);
    +            } catch (NumberFormatException nfe) {
    +                logger.error("{} has an invalid value '{}' on FlowFile 
{}", new Object[] {WAIT_START_TIMESTAMP, waitStartTimestamp, flowFile});
    +                flowFile = session.penalize(flowFile);
    +                session.transfer(flowFile, REL_FAILURE);
    +                return;
    +            }
    +            long expirationDuration = 
context.getProperty(EXPIRATION_DURATION)
    +                    .asTimePeriod(TimeUnit.MILLISECONDS);
    +            long now = System.currentTimeMillis();
    +            if (now > (lWaitStartTimestamp + expirationDuration)) {
    +                logger.warn("FlowFile {} expired after {}ms", new Object[] 
{flowFile, (now - lWaitStartTimestamp)});
    +                session.transfer(flowFile, REL_EXPIRED);
    +                return;
    +            }
    +
    +            // get notifying flow file attributes
    +            Map<String, String> cachedAttributes = cache.get(cacheKey, 
keySerializer, valueDeserializer);
    +
    +            if (cachedAttributes == null) {
    +                if (logger.isDebugEnabled()) {
    +                    logger.debug("No release signal yet for {} on FlowFile 
{}", new Object[] {cacheKey, flowFile});
    +                }
    +                session.transfer(flowFile, REL_WAIT);
    --- End diff --
    
    After I wrote this comment I also started thinking about using the 
scheduling to control it, that seems good to me, so no need to do anything else 
here.


> Wait/Notify processors
> ----------------------
>
>                 Key: NIFI-190
>                 URL: https://issues.apache.org/jira/browse/NIFI-190
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Joseph Gresock
>            Assignee: Joseph Gresock
>            Priority: Minor
>             Fix For: 1.2.0
>
>         Attachments: Wait_Notify_template.xml
>
>
> Our team has developed a set of processors for the following use case:
> * Format A needs to be sent to Endpoint A
> * Format B needs to be sent to Endpoint B, but should not proceed until A has 
> reached Endpoint A.  We most commonly have this restriction when Endpoint B 
> requires some output of Endpoint A.
> The proposed Wait/Notify processors enable this functionality:
> * Wait: routes files to the 'wait' relationship until a matching Release 
> Signal Identifier is found in the distributed map cache.  Then routes them to 
> 'success' (unless they have expired)
> * Notify: stores a Release Signal Identifier in the distributed map cache, 
> optionally with attributes to copy to the outgoing matching Wait flow files.
> An example:
> Wait is configured with Release Signal Attribute = "$\{myId}". Its 'wait' 
> relationship routes back onto itself.
>     flowFile 1 \{ myId : "123" }
>     comes into Wait processor
>     Wait checks the distributed cache map for "123", doesn't find it, and is 
> routed to the 'wait' relationship
> Notify is configured with Release Signal Attribute = "$\{myId}"
>     flowFile 2 \{ myId : "123" }
>     comes in to Notify processor
>     Notify puts an entry in the map for "123" with any other attributes from 
> flowFile2
> Next time flowFile 1 is processed by Wait...
>     Finds an entry for "123"
>     Removes that entry from the map
>     Copies attributes to flowFile 1
>     Sends flowFile 1 out the success relationship
> Notify will optionally cache attributes in the distributed map, as determined 
> by a regex property.  This is what allows the output of Endpoint A to pass to 
> Endpoint B, above.  Wait also allows conflicting attributes from the cache to 
> either be replaced or kept, depending on property configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to