[
https://issues.apache.org/jira/browse/NIFI-190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15754434#comment-15754434
]
ASF GitHub Bot commented on NIFI-190:
-------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1329#discussion_r92808940
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Notify.java
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
+
+@EventDriven
+@SupportsBatching
+@Tags({"map", "cache", "notify", "distributed", "signal", "release"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Caches a release signal identifier in the
distributed cache, optionally along with "
+ + "the FlowFile's attributes. Any flow files held at a
corresponding Wait processor will be "
+ + "released once this signal in the cache is discovered.")
+@SeeAlso(classNames =
{"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService",
"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
+ "org.apache.nifi.processors.standard.Wait"})
+public class Notify extends AbstractProcessor {
+
+ // Identifies the distributed map cache client
+ public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new
PropertyDescriptor.Builder()
+ .name("Distributed Cache Service")
+ .description("The Controller Service that is used to check for
release signals from a corresponding Notify processor")
+ .required(true)
+ .identifiesControllerService(DistributedMapCacheClient.class)
+ .build();
+
+ // Selects the FlowFile attribute or expression, whose value is used
as cache key
+ public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new
PropertyDescriptor.Builder()
+ .name("Release Signal Identifier")
+ .description("A value, or the results of an Attribute Expression
Language statement, which will " +
+ "be evaluated against a FlowFile in order to determine the
release signal cache key")
+ .required(true)
+
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
true))
+ .expressionLanguageSupported(true)
+ .build();
+
+ // Specifies an optional regex used to identify which attributes to
cache
+ public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new
PropertyDescriptor.Builder()
+ .name("Attribute Cache Regex")
+ .description("Any attributes whose names match this regex will be
stored in the distributed cache to be "
+ + "copied to any FlowFiles released from a corresponding
Wait processor. Note that the "
+ + "uuid attribute will not be cached regardless of this
value. If blank, no attributes "
+ + "will be cached.")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile with a matching release signal in the
cache will be routed to this relationship")
--- End diff --
Minor point, but should this be something like "All FlowFiles where the
release signal has been successfully entered in the cache will be routed to
this relationship"
> Wait/Notify processors
> ----------------------
>
> Key: NIFI-190
> URL: https://issues.apache.org/jira/browse/NIFI-190
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Joseph Gresock
> Assignee: Joseph Gresock
> Priority: Minor
> Fix For: 1.2.0
>
> Attachments: Wait_Notify_template.xml
>
>
> Our team has developed a set of processors for the following use case:
> * Format A needs to be sent to Endpoint A
> * Format B needs to be sent to Endpoint B, but should not proceed until A has
> reached Endpoint A. We most commonly have this restriction when Endpoint B
> requires some output of Endpoint A.
> The proposed Wait/Notify processors enable this functionality:
> * Wait: routes files to the 'wait' relationship until a matching Release
> Signal Identifier is found in the distributed map cache. Then routes them to
> 'success' (unless they have expired)
> * Notify: stores a Release Signal Identifier in the distributed map cache,
> optionally with attributes to copy to the outgoing matching Wait flow files.
> An example:
> Wait is configured with Release Signal Attribute = "$\{myId}". Its 'wait'
> relationship routes back onto itself.
> flowFile 1 \{ myId : "123" }
> comes into Wait processor
> Wait checks the distributed cache map for "123", doesn't find it, and is
> routed to the 'wait' relationship
> Notify is configured with Release Signal Attribute = "$\{myId}"
> flowFile 2 \{ myId : "123" }
> comes in to Notify processor
> Notify puts an entry in the map for "123" with any other attributes from
> flowFile2
> Next time flowFile 1 is processed by Wait...
> Finds an entry for "123"
> Removes that entry from the map
> Copies attributes to flowFile 1
> Sends flowFile 1 out the success relationship
> Notify will optionally cache attributes in the distributed map, as determined
> by a regex property. This is what allows the output of Endpoint A to pass to
> Endpoint B, above. Wait also allows conflicting attributes from the cache to
> either be replaced or kept, depending on property configuration.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)