This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 31097c9 NIFI-6385 Added signal.id penalization - Add additional doc
about best practices.
31097c9 is described below
commit 31097c96d6f77e1493c331f76838190d4df2b9e2
Author: Koji Kawamura <[email protected]>
AuthorDate: Thu Jun 20 10:38:22 2019 +0900
NIFI-6385 Added signal.id penalization
- Add additional doc about best practices.
This closes #3540.
Signed-off-by: Mark Payne <[email protected]>
---
.../org/apache/nifi/processors/standard/Wait.java | 55 ++++
.../additionalDetails.html | 277 +++++++++++++++++++++
.../apache/nifi/processors/standard/TestWait.java | 29 +++
3 files changed, 361 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index e297556..45ffcb2 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -43,6 +44,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
@@ -212,6 +214,24 @@ public class Wait extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
+ public static final PropertyDescriptor WAIT_PENALTY_DURATION = new
PropertyDescriptor.Builder()
+ .name("wait-penalty-duration")
+ .displayName("Wait Penalty Duration")
+ .description("If configured, after a signal identifier got processed
but did not meet the release criteria," +
+ " the signal identifier is penalized and FlowFiles having the
signal identifier" +
+ " will not be processed again for the specified period of time," +
+ " so that the signal identifier will not block others to be
processed." +
+ " This can be useful for use cases where a Wait processor is
expected to process multiple signal identifiers," +
+ " and each signal identifier has multiple FlowFiles," +
+ " and also the order of releasing FlowFiles is important within a
signal identifier." +
+ " The FlowFile order can be configured with Prioritizers." +
+ " IMPORTANT: There is a limitation of number of queued signals can
be processed," +
+ " and Wait processor may not be able to check all queued signal
ids. See additional details for the best practice.")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile with a matching release signal in the
cache will be routed to this relationship")
@@ -234,6 +254,8 @@ public class Wait extends AbstractProcessor {
private final Set<Relationship> relationships;
+ private final Map<String, Long> signalIdPenalties = new HashMap<>();
+
public Wait() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
@@ -255,6 +277,7 @@ public class Wait extends AbstractProcessor {
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_COPY_MODE);
descriptors.add(WAIT_MODE);
+ descriptors.add(WAIT_PENALTY_DURATION);
return descriptors;
}
@@ -280,6 +303,19 @@ public class Wait extends AbstractProcessor {
final List<FlowFile> failedFilteringFlowFiles = new ArrayList<>();
final Supplier<FlowFileFilter.FlowFileFilterResult>
acceptResultSupplier =
() -> bufferedCount.incrementAndGet() == bufferCount ?
ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE;
+
+ // Clear expired penalties.
+ if (!signalIdPenalties.isEmpty()) {
+ final Iterator<Entry<String, Long>> penaltyIterator =
signalIdPenalties.entrySet().iterator();
+ final long now = System.currentTimeMillis();
+ while (penaltyIterator.hasNext()) {
+ final Entry<String, Long> penalty = penaltyIterator.next();
+ if (penalty.getValue() < now) {
+ penaltyIterator.remove();
+ }
+ }
+ }
+
final List<FlowFile> flowFiles = session.get(f -> {
final String fSignalId =
signalIdProperty.evaluateAttributeExpressions(f).getValue();
@@ -292,6 +328,11 @@ public class Wait extends AbstractProcessor {
return ACCEPT_AND_CONTINUE;
}
+ if (signalIdPenalties.containsKey(fSignalId)) {
+ // This id is penalized.
+ return REJECT_AND_CONTINUE;
+ }
+
final String targetSignalIdStr = targetSignalId.get();
if (targetSignalIdStr == null) {
// This is the first one.
@@ -468,6 +509,12 @@ public class Wait extends AbstractProcessor {
// Transfer FlowFiles.
processedFlowFiles.entrySet().forEach(transferFlowFiles);
+ // Penalize signal id if no FlowFile transferred to success.
+ final PropertyValue waitPenaltyDuration =
context.getProperty(WAIT_PENALTY_DURATION);
+ if (waitPenaltyDuration.isSet() &&
getFlowFilesFor.apply(REL_SUCCESS).isEmpty()) {
+ signalIdPenalties.put(signalId, System.currentTimeMillis() +
waitPenaltyDuration.asTimePeriod(TimeUnit.MILLISECONDS));
+ }
+
// Update signal if needed.
try {
if (waitCompleted) {
@@ -515,4 +562,12 @@ public class Wait extends AbstractProcessor {
return session.putAllAttributes(flowFile, attributesToCopy);
}
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ signalIdPenalties.clear();
+ }
+
+ Map<String, Long> getSignalIdPenalties() {
+ return signalIdPenalties;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html
new file mode 100644
index 0000000..4f54271
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.Wait/additionalDetails.html
@@ -0,0 +1,277 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8"/>
+ <title>ValidateCsv</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ <style>
+ table td:first-child {text-align: center;}
+ </style>
+
+</head>
+
+<body>
+
+<h2>Best practices to handle multiple signal ids at a Wait processor</h2>
+
+When a Wait processor is expected to process multiple signal ids, by
configuring 'Release Signal Identifier' with a FlowFile attribute Expression
Language, there are few things to consider in order to get the expected result.
Processor configuration can vary based on your requirement.
+
+Also, you will need to have high level understanding on how Wait processor
works:
+<ul>
+ <li>Wait processor only process a single signal id at a time</li>
+ <li>How frequent Wait processor runs is defined at 'Run Schedule'</li>
+ <li>Which FlowFile is processed is determined by Prioritizer</li>
+ <li>Not limited to Wait processor, but for all processors, the order of
queued FlowFiles in a connection is undefined if no Prioritizer is set</li>
+</ul>
+
+
+See following sections for common patterns
+<ul>
+ <li><a href="#asap">Release any FlowFile as soon as its signal is
notified</a></li>
+ <li><a href="#higher-priority">Release higher priority FlowFiles in
each signal id</a></li>
+</ul>
+
+<h3 id="asap">Release any FlowFile as soon as its signal is notified</h3>
+
+This is the most common use case.
+FlowFiles are independent and can be released in any order.
+
+<h4>Important configurations:</h4>
+<ul>
+ <li>Use FirstInFirstOutPrioritizer (FIFO) at 'wait' relationship (or
the incoming connection if 'Wait Mode' is 'Keep in the upstream connection)</li>
+</ul>
+
+The following table illustrates the notified signal ids, queued FlowFiles and
what will happen at each Wait run cycle.
+
+<table>
+ <tbody>
+ <tr>
+ <th># of Wait run</th>
+ <th>Notified Signals</th>
+ <th>Queue Index (FIFO)</th>
+ <th>FlowFile UUID</th>
+ <th>Signal ID</th>
+ <th> </th>
+ </tr>
+ <tr>
+ <td rowspan="3">1</td>
+ <td rowspan="3">B</td>
+ <td>1</td>
+ <td>a</td>
+ <td>A</td>
+ <td>This FlowFile is processed. But its signal is not found,
and will be re-queued at the end of the queue.</td>
+ </tr>
+ <tr>
+ <td>2</td>
+ <td>b</td>
+ <td>B</td>
+ <td> </td>
+ </tr>
+ <tr>
+ <td>3</td>
+ <td>c</td>
+ <td>C</td>
+ <td> </td>
+ </tr>
+ <tr>
+ <td rowspan="3">2</td>
+ <td rowspan="3">B</td>
+ <td>1</td>
+ <td>b</td>
+ <td>B</td>
+ <td>This FlowFile is processed and since its signal is
notified, this one will be released to 'success'.</td>
+ </tr>
+ <tr>
+ <td>2</td>
+ <td>c</td>
+ <td>C</td>
+ <td> </td>
+ </tr>
+ <tr>
+ <td>3</td>
+ <td>a</td>
+ <td>A</td>
+ <td> </td>
+ </tr>
+ <tr>
+ <td rowspan="3">3</td>
+ <td rowspan="3"> </td>
+ <td>1</td>
+ <td>c</td>
+ <td>C</td>
+ <td>This FlowFile will be processed at the next run.</td>
+ </tr>
+ <tr>
+ <td>2</td>
+ <td>a</td>
+ <td>A</td>
+ <td> </td>
+ </tr>
+ </tbody>
+</table>
+
+
+
+<h3 id="higher-priority">Release higher priority FlowFiles in each signal
id</h3>
+
+Multiple FlowFiles share the same signal id, and the order of releasing a
FlowFile is important.
+
+<h4>Important configurations:</h4>
+<ul>
+ <li>Use a (or set of a) Prioritizer(s) suites your need other than
FIFO, at 'wait' relationship (or the incoming connection if 'Wait Mode' is
'Keep in the upstream connection), e.g. PriorityPrioritizer</li>
+ <li>Specify adequate 'Wait Penalty Duration', e.g. "3 sec", </li>
+ <li>'Wait Penalty Duration' should be grater than 'Run Schedule', e.g
"3 sec" > "1 sec"</li>
+ <li>Increase 'Run Duration' to avoid the limitation of number of signal
ids (see the <a href="#run-duration">note</a> below)</li>
+</ul>
+
+The following table illustrates the notified signal ids, queued FlowFiles and
what will happen at each Wait run cycle.
+The example uses PriorityPrioritizer to control the order of processing
FlowFiles within a signal id.
+
+If 'Wait Penalty Duration' is configured, Wait processor tracks unreleased
signal ids and their penalty representing when they will be checked again.
+
+<table>
+ <tbody>
+ <tr>
+ <th># of Wait run</th>
+ <th>Notified Signals</th>
+ <th>Signal Penalties</th>
+ <th>Queue Index (via 'priority' attribute)</th>
+ <th>FlowFile UUID</th>
+ <th>Signal ID</th>
+ <th>'priority' attr</th>
+ <th> </th>
+ </tr>
+ <tr>
+ <td rowspan="3">1 (00:01)</td>
+ <td rowspan="3">B</td>
+ <td rowspan="3"> </td>
+ <td>1</td>
+ <td>a-1</td>
+ <td>A</td>
+ <td>1</td>
+ <td>This FlowFile is processed. But its signal is not found.
Penalized.</td>
+ </tr>
+ <tr>
+ <td>2</td>
+ <td>b-1</td>
+ <td>B</td>
+ <td>1</td>
+ <td>Since a-1 and b-1 have the same priority '1', b-1 may be
processed before a-1. You can add another Prioritizer to define more specific
ordering.</td>
+ </tr>
+ <tr>
+ <td>3</td>
+ <td>b-2</td>
+ <td>B</td>
+ <td>2</td>
+ <td> </td>
+ </tr>
+ <tr>
+ <td rowspan="3">2 (00:02)</td>
+ <td rowspan="3">B</td>
+ <td rowspan="3">A (00:04)</td>
+ <td>1</td>
+ <td>a-1</td>
+ <td>A</td>
+ <td>1</td>
+ <td>This FlowFile is the first one according to the configured
Prioritizer, but the signal id is penalized. So, this FlowFile is skipped at
this execution.</td>
+ </tr>
+ <tr>
+ <td>2</td>
+ <td>b-1</td>
+ <td>B</td>
+ <td>1</td>
+ <td>This FlowFile is processed.</td>
+ </tr>
+ <tr>
+ <td>3</td>
+ <td>b-2</td>
+ <td>B</td>
+ <td>2</td>
+ <td> </td>
+ </tr>
+ <tr>
+ <td rowspan="2">3 (00:03)</td>
+ <td rowspan="2"> </td>
+ <td rowspan="2">A (00:04)</td>
+ <td>1</td>
+ <td>a-1</td>
+ <td>A</td>
+ <td>1</td>
+ <td>This FlowFile is the first one but is still penalized.</td>
+ </tr>
+ <tr>
+ <td>2</td>
+ <td>b-2</td>
+ <td>B</td>
+ <td>2</td>
+ <td>This FlowFile is processed, but its signal is not notified
yet, thus will be penalized.</td>
+ </tr>
+ <tr>
+ <td rowspan="2">4 (00:04)</td>
+ <td rowspan="2"> </td>
+ <td rowspan="2">B (00:06)</td>
+ <td>1</td>
+ <td>a-1</td>
+ <td>A</td>
+ <td>1</td>
+ <td>This FlowFile is no longer penalized, and get processed.
But its signal is not notified yet, thus will be penalized again.</td>
+ </tr>
+ <tr>
+ <td>2</td>
+ <td>b-2</td>
+ <td>B</td>
+ <td>2</td>
+ <td> </td>
+ </tr>
+ </tbody>
+</table>
+
+<h4 id="run-duration">The importance of 'Run Duration' when 'Wait Penalty
Duration' is used</h4>
+
+<p>
+There are limitation of number of signals can be checked based on the
combination of 'Run Schedule' and 'Wait Penalize Duration'.
+If this limitation is engaged, some FlowFiles may not be processed and remain
in the 'wait' relationship even if their signal ids are notified.
+Let's say Wait is configured with:
+</p>
+
+<ul>
+ <li>Run Schedule = 1 sec</li>
+ <li>Wait Penalize Duration = 3 sec</li>
+ <li>Release Signal Identifier = ${uuid}</li>
+</ul>
+
+<p>
+And there are 5 FlowFiles F1, F2 ... F5 in the 'wait' relationship.
+Then the signal for F5 is notified.
+Wait will work as follows:
+</p>
+
+<ul>
+ <li>At 00:00 Wait checks the signal for F1, not found, and penalize F1
(till 00:03)</li>
+ <li>At 00:01 Wait checks the signal for F2, not found, and penalize F2
(till 00:04)</li>
+ <li>At 00:02 Wait checks the signal for F3, not found, and penalize F3
(till 00:05)</li>
+ <li>At 00:03 Wait checks the signal for F4, not found, and penalize F4
(till 00:06)</li>
+ <li>At 00:04 Wait checks the signal for F1 again, because it's not
penalized any longer</li>
+</ul>
+Repeat above cycle, thus F5 will not be released until one of F1 ... F4 is
released.
+
+<p>
+To mitigate such limitation, increasing 'Run Duration' is recommended. By
increasing 'Run Duration', Wait processor can keep being scheduled for that
duration. For example, with 'Run Duration' 500 ms, Wait should be able to loop
through all 5 queued FlowFiles at a single run.
+</p>
+
+</body>
+</html>
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index 2ccb2fe..7970601 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -206,6 +206,35 @@ public class TestWait {
}
@Test
+ public void testWaitPenaltyDuration() throws InitializationException {
+ runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER,
"${releaseSignalAttribute}");
+ runner.setProperty(Wait.WAIT_PENALTY_DURATION, "1 hour");
+
+ final Map<String, String> props = new HashMap<>();
+ props.put("releaseSignalAttribute", "1");
+ runner.enqueue(new byte[]{}, props);
+
+ runner.run(1, false);
+
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ runner.clearTransferState();
+
+ // The signal id should be penalized
+ final Wait processor = (Wait) runner.getProcessor();
+ final Map<String, Long> signalIdPenalties =
processor.getSignalIdPenalties();
+ assertEquals(1, signalIdPenalties.size());
+ assertTrue(signalIdPenalties.containsKey("1"));
+
+ // FlowFile with the penalized id shouldn't be processed
+ runner.enqueue(new byte[]{}, props);
+
+ runner.run(1, false);
+
+ runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 0);
+ runner.clearTransferState();
+ }
+
+ @Test
public void testReplaceAttributes() throws InitializationException,
IOException {
Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("both", "notifyValue");