Repository: nifi Updated Branches: refs/heads/master 9f23ec360 -> c1b99d584
NIFI-4028 - fix cache update when Wait releases flow files NIFI-4028: Refactored Wait processor. - Consolidated implementation for the cases of releasableFlowCount is 1 or more, in order to reduce complexity and behavior differences - Added 'consumed' counter when total counter is used to release incoming FlowFiles - Fixed method name typo, releaseCandidates This closes #2055. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c1b99d58 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c1b99d58 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c1b99d58 Branch: refs/heads/master Commit: c1b99d584d93bb6f26ba0986f2fcaf663b0caef3 Parents: 9f23ec3 Author: Pierre Villard <[email protected]> Authored: Thu Aug 3 22:56:27 2017 +0200 Committer: Koji Kawamura <[email protected]> Committed: Thu Aug 17 08:30:07 2017 +0900 ---------------------------------------------------------------------- .../apache/nifi/processors/standard/Wait.java | 29 ++--- .../processors/standard/WaitNotifyProtocol.java | 36 ++++-- .../nifi/processors/standard/TestWait.java | 60 +++++++++- .../standard/TestWaitNotifyProtocol.java | 109 ++++++++++++++++++- 4 files changed, 203 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c1b99d58/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index fccd443..4e5ae5d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -305,6 +305,8 @@ public class Wait extends AbstractProcessor { final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue(); final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode); final AtomicReference<Signal> signalRef = new AtomicReference<>(); + // This map contains original counts before those are consumed to release incoming FlowFiles. + final HashMap<String, Long> originalSignalCounts = new HashMap<>(); final Consumer<FlowFile> transferToFailure = flowFile -> { flowFile = session.penalize(flowFile); @@ -324,7 +326,7 @@ public class Wait extends AbstractProcessor { } final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream() - .map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList()); + .map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList()); session.transfer(flowFilesWithSignalAttributes, relationship); }; @@ -349,6 +351,9 @@ public class Wait extends AbstractProcessor { // get notifying signal try { signal = protocol.getSignal(signalId); + if (signal != null) { + originalSignalCounts.putAll(signal.getCounts()); + } signalRef.set(signal); } catch (final IOException e) { throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e); @@ -423,29 +428,20 @@ public class Wait extends AbstractProcessor { boolean waitProgressed = false; if (signal != null && !candidates.isEmpty()) { - if (releasableFlowFileCount > 1) { - signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates, + if (releasableFlowFileCount > 0) { + signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates, released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released), waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting)); + waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0; waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty(); } else { - // releasableFlowFileCount = 0 or 1 boolean reachedTargetCount = StringUtils.isBlank(targetCounterName) ? signal.isTotalCountReached(targetCount) : signal.isCountReached(targetCounterName, targetCount); if (reachedTargetCount) { - if (releasableFlowFileCount == 0) { - getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates); - } else { - // releasableFlowFileCount = 1 - getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0)); - getFlowFilesFor.apply(REL_WAIT).addAll(candidates); - // If releasableFlowFileCount == 0, leave signal as it is, - // so that any number of FlowFile can be released as long as target count condition matches. - waitCompleted = true; - } + getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates); } else { getFlowFilesFor.apply(REL_WAIT).addAll(candidates); } @@ -470,7 +466,7 @@ public class Wait extends AbstractProcessor { } - private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) { + private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) { if (signal == null) { return flowFile; } @@ -488,8 +484,7 @@ public class Wait extends AbstractProcessor { } // Copy counter attributes - final Map<String, Long> counts = signal.getCounts(); - final long totalCount = counts.entrySet().stream().mapToLong(e -> { + final long totalCount = originalCount.entrySet().stream().mapToLong(e -> { final Long count = e.getValue(); attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count)); return count; http://git-wip-us.apache.org/repos/asf/nifi/blob/c1b99d58/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java index 6183455..2c9c9fd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; @@ -45,6 +46,7 @@ public class WaitNotifyProtocol { private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class); public static final String DEFAULT_COUNT_NAME = "default"; + public static final String CONSUMED_COUNT_NAME = "consumed"; private static final int MAX_REPLACE_RETRY_COUNT = 5; private static final int REPLACE_RETRY_WAIT_MILLIS = 10; @@ -86,9 +88,13 @@ public class WaitNotifyProtocol { this.attributes = attributes; } + @JsonIgnore + public long getTotalCount() { + return counts.values().stream().mapToLong(Long::longValue).sum(); + } + public boolean isTotalCountReached(final long targetCount) { - final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum(); - return totalCount >= targetCount; + return getTotalCount() >= targetCount; } public boolean isCountReached(final String counterName, final long targetCount) { @@ -96,6 +102,10 @@ public class WaitNotifyProtocol { } public long getCount(final String counterName) { + if (counterName == null || counterName.isEmpty()) { + return getTotalCount(); + } + final Long count = counts.get(counterName); return count != null ? count : 0; } @@ -115,7 +125,7 @@ public class WaitNotifyProtocol { * Caller of this method is responsible for updating cache storage after processing released and waiting candidates * by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.</p> * - * @param _counterName signal counter name to consume from. + * @param counterName signal counter name to consume from. If not specified, total counter is used, and 'consumed' counter is added to subtract consumed counters from total counter. * @param requiredCountForPass number of required signals to acquire a pass. * @param releasableCandidateCountPerPass number of releasable candidate per pass. * @param candidates candidates waiting for being allowed to pass. @@ -123,12 +133,9 @@ public class WaitNotifyProtocol { * @param waiting function to process candidates those should remain in waiting queue. * @param <E> Type of candidate */ - public <E> void releaseCandidatese(final String _counterName, final long requiredCountForPass, - final int releasableCandidateCountPerPass, final List<E> candidates, - final Consumer<List<E>> released, final Consumer<List<E>> waiting) { - - // counterName is mandatory otherwise, we can't decide which counter to convert into pass count. - final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName; + public <E> void releaseCandidates(final String counterName, final long requiredCountForPass, + final int releasableCandidateCountPerPass, final List<E> candidates, + final Consumer<List<E>> released, final Consumer<List<E>> waiting) { final int candidateSize = candidates.size(); if (releasableCount < candidateSize) { @@ -137,11 +144,18 @@ public class WaitNotifyProtocol { final long signalCount = getCount(counterName); releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass; final long reducedSignalCount = signalCount % requiredCountForPass; - counts.put(counterName, reducedSignalCount); + if (counterName != null && !counterName.isEmpty()) { + // Update target counter with reduced count. + counts.put(counterName, reducedSignalCount); + } else { + // If target counter name is not specified, add consumed count to subtract from accumulated total count. + Long consumedCount = counts.getOrDefault(CONSUMED_COUNT_NAME, 0L); + consumedCount -= signalCount - reducedSignalCount; + counts.put(CONSUMED_COUNT_NAME, consumedCount); + } } int releaseCount = Math.min(releasableCount, candidateSize); - released.accept(candidates.subList(0, releaseCount)); waiting.accept(candidates.subList(releaseCount, candidateSize)); http://git-wip-us.apache.org/repos/asf/nifi/blob/c1b99d58/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index 71187b6..a4df2f3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -459,8 +460,65 @@ public class TestWait { runner.clearTransferState(); - assertNull("The key no longer exist", protocol.getSignal("key")); + } + + @Test + public void testDecrementCache() throws ConcurrentModificationException, IOException { + Map<String, String> cachedAttributes = new HashMap<>(); + cachedAttributes.put("both", "notifyValue"); + cachedAttributes.put("uuid", "notifyUuid"); + cachedAttributes.put("notify.only", "notifyValue"); + + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + + // A flow file comes in Notify and increment the counter + protocol.notify("key", "counter", 1, cachedAttributes); + + // another flow files comes in Notify and increment the counter + protocol.notify("key", "counter", 1, cachedAttributes); + + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}"); + runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "1"); + runner.assertValid(); + + final Map<String, String> waitAttributes = new HashMap<>(); + waitAttributes.put("releaseSignalAttribute", "key"); + waitAttributes.put("signalCounterName", "counter"); + waitAttributes.put("wait.only", "waitValue"); + waitAttributes.put("both", "waitValue"); + waitAttributes.put("uuid", UUID.randomUUID().toString()); + String flowFileContent = "content"; + runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); + + /* + * 1st iteration + */ + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); + MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + outputFlowFile.assertAttributeEquals("wait.counter.counter", "2"); + + // expect counter to be decremented to 0 and releasable count remains 1. + assertEquals("0", Long.toString(protocol.getSignal("key").getCount("counter"))); + assertEquals("1", Long.toString(protocol.getSignal("key").getReleasableCount())); + // introduce a second flow file with the same signal attribute + runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); + + /* + * 2nd iteration + */ + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); + outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + // All counters are consumed. + outputFlowFile.assertAttributeEquals("wait.counter.counter", "0"); + + assertNull("The key no longer exist", protocol.getSignal("key")); + runner.clearTransferState(); } private class TestIteration { http://git-wip-us.apache.org/repos/asf/nifi/blob/c1b99d58/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index e3f982c..01983d5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -37,6 +37,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.nifi.processors.standard.WaitNotifyProtocol.CONSUMED_COUNT_NAME; import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -267,12 +268,12 @@ public class TestWaitNotifyProtocol { final List<Integer> waiting = new ArrayList<>(); // Test default name. - final String counterName = null; + final String counterName = DEFAULT_COUNT_NAME; final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> { released.clear(); waiting.clear(); - signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates, + signal.releaseCandidates(counterName, requiredCountForPass, releasableCandidatePerPass, candidates, r -> released.addAll(r), w -> waiting.addAll(w)); }; @@ -336,4 +337,108 @@ public class TestWaitNotifyProtocol { } + + @Test + public void testReleaseCandidateTotal() throws Exception { + final List<Integer> candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList()); + final Signal signal = new Signal(); + final List<Integer> released = new ArrayList<>(); + final List<Integer> waiting = new ArrayList<>(); + + // Test empty counter name, should use total counters. + final String emptyCounterName = null; + + final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> { + released.clear(); + waiting.clear(); + signal.releaseCandidates(emptyCounterName, requiredCountForPass, releasableCandidatePerPass, candidates, + r -> released.addAll(r), w -> waiting.addAll(w)); + }; + + final String counterA = "counterA"; + final String counterB = "counterB"; + final String counterC = "counterC"; + + final Field releasableCount = Signal.class.getDeclaredField("releasableCount"); + releasableCount.setAccessible(true); + + // No counter, should wait. + releaseCandidate.accept(3L, 1); + assertEquals(0, released.size()); + assertEquals(10, waiting.size()); + assertEquals(0, signal.getCount(emptyCounterName)); + assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter is not enough yet. + signal.getCounts().put(counterA, 1L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 1); + assertEquals(0, released.size()); + assertEquals(10, waiting.size()); + assertEquals(1, signal.getCount(emptyCounterName)); // Counter incremented, but not enough + assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target. + signal.getCounts().put(counterA, 1L); + signal.getCounts().put(counterB, 1L); + signal.getCounts().put(counterC, 1L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 1); + assertEquals(1, released.size()); + assertEquals(9, waiting.size()); + assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release + assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two candidates. + signal.getCounts().put(counterA, 1L); + signal.getCounts().put(counterB, 2L); + signal.getCounts().put(counterC, 3L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 1); + assertEquals(2, released.size()); + assertEquals(8, waiting.size()); + assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release + assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two candidates, and reminder is 2. + signal.getCounts().put(counterA, 3L); + signal.getCounts().put(counterB, 3L); + signal.getCounts().put(counterC, 5L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 1); + assertEquals(3, released.size()); // 11 / 3 = 3 + assertEquals(7, waiting.size()); + assertEquals(2, signal.getCount(emptyCounterName)); + assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two pass count and each pass can release 2 candidates. + signal.getCounts().put(counterA, 1L); + signal.getCounts().put(counterB, 2L); + signal.getCounts().put(counterC, 3L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 2); + assertEquals(4, released.size()); // (6 / 3) * 2 = 4 + assertEquals(6, waiting.size()); + assertEquals(0, signal.getCount(emptyCounterName)); + assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // If there are counts more than enough to release current candidates, unused releasableCount should remain. + signal.getCounts().put(counterA, 10L); + signal.getCounts().put(counterB, 20L); + signal.getCounts().put(counterC, 20L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 2); + assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10. + assertEquals(0, waiting.size()); + assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2. + assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2. + assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22. + + } } \ No newline at end of file
