[ 
https://issues.apache.org/jira/browse/NIFI-190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15754434#comment-15754434
 ] 

ASF GitHub Bot commented on NIFI-190:
-------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1329#discussion_r92808940
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
 ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
    +import org.apache.nifi.distributed.cache.client.Serializer;
    +import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
    +import org.apache.nifi.expression.AttributeExpression.ResultType;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import 
org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
    +
    +@EventDriven
    +@SupportsBatching
    +@Tags({"map", "cache", "notify", "distributed", "signal", "release"})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@CapabilityDescription("Caches a release signal identifier in the 
distributed cache, optionally along with "
    +        + "the FlowFile's attributes.  Any flow files held at a 
corresponding Wait processor will be "
    +        + "released once this signal in the cache is discovered.")
    +@SeeAlso(classNames = 
{"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", 
"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
    +        "org.apache.nifi.processors.standard.Wait"})
    +public class Notify extends AbstractProcessor {
    +
    +    // Identifies the distributed map cache client
    +    public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
    +        .name("Distributed Cache Service")
    +        .description("The Controller Service that is used to check for 
release signals from a corresponding Notify processor")
    +        .required(true)
    +        .identifiesControllerService(DistributedMapCacheClient.class)
    +        .build();
    +
    +    // Selects the FlowFile attribute or expression, whose value is used 
as cache key
    +    public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new 
PropertyDescriptor.Builder()
    +        .name("Release Signal Identifier")
    +        .description("A value, or the results of an Attribute Expression 
Language statement, which will " +
    +            "be evaluated against a FlowFile in order to determine the 
release signal cache key")
    +        .required(true)
    +        
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
 true))
    +        .expressionLanguageSupported(true)
    +        .build();
    +
    +    // Specifies an optional regex used to identify which attributes to 
cache
    +    public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new 
PropertyDescriptor.Builder()
    +        .name("Attribute Cache Regex")
    +        .description("Any attributes whose names match this regex will be 
stored in the distributed cache to be "
    +                + "copied to any FlowFiles released from a corresponding 
Wait processor.  Note that the "
    +                + "uuid attribute will not be cached regardless of this 
value.  If blank, no attributes "
    +                + "will be cached.")
    +        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
    +        .expressionLanguageSupported(false)
    +        .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +        .name("success")
    +        .description("A FlowFile with a matching release signal in the 
cache will be routed to this relationship")
    --- End diff --
    
    Minor point, but should this be something like "All FlowFiles where the 
release signal has been successfully entered in the cache will be routed to 
this relationship"


> Wait/Notify processors
> ----------------------
>
>                 Key: NIFI-190
>                 URL: https://issues.apache.org/jira/browse/NIFI-190
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Joseph Gresock
>            Assignee: Joseph Gresock
>            Priority: Minor
>             Fix For: 1.2.0
>
>         Attachments: Wait_Notify_template.xml
>
>
> Our team has developed a set of processors for the following use case:
> * Format A needs to be sent to Endpoint A
> * Format B needs to be sent to Endpoint B, but should not proceed until A has 
> reached Endpoint A.  We most commonly have this restriction when Endpoint B 
> requires some output of Endpoint A.
> The proposed Wait/Notify processors enable this functionality:
> * Wait: routes files to the 'wait' relationship until a matching Release 
> Signal Identifier is found in the distributed map cache.  Then routes them to 
> 'success' (unless they have expired)
> * Notify: stores a Release Signal Identifier in the distributed map cache, 
> optionally with attributes to copy to the outgoing matching Wait flow files.
> An example:
> Wait is configured with Release Signal Attribute = "$\{myId}". Its 'wait' 
> relationship routes back onto itself.
>     flowFile 1 \{ myId : "123" }
>     comes into Wait processor
>     Wait checks the distributed cache map for "123", doesn't find it, and is 
> routed to the 'wait' relationship
> Notify is configured with Release Signal Attribute = "$\{myId}"
>     flowFile 2 \{ myId : "123" }
>     comes in to Notify processor
>     Notify puts an entry in the map for "123" with any other attributes from 
> flowFile2
> Next time flowFile 1 is processed by Wait...
>     Finds an entry for "123"
>     Removes that entry from the map
>     Copies attributes to flowFile 1
>     Sends flowFile 1 out the success relationship
> Notify will optionally cache attributes in the distributed map, as determined 
> by a regex property.  This is what allows the output of Endpoint A to pass to 
> Endpoint B, above.  Wait also allows conflicting attributes from the cache to 
> either be replaced or kept, depending on property configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to